Statistics
| Branch: | Revision:

streamers / loop.c @ 03de31e0

History | View | Annotate | Download (4.97 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/select.h>
8
#include <sys/time.h>
9
#include <time.h>
10
#include <stdint.h>
11
#include <stdlib.h>
12
#include <stdio.h>
13
#include <stdbool.h>
14
#include <string.h>
15

    
16
#include <net_helper.h>
17
#include <grapes_msg_types.h>
18
#include <peerset.h>
19
#include <peer.h>
20

    
21
#include "compatibility/timer.h"
22

    
23
#include "chunk_signaling.h"
24
#include "streaming.h"
25
#include "topology.h"
26
#include "loop.h"
27
#include "dbg.h"
28

    
29
#define BUFFSIZE 512 * 1024
30
#define FDSSIZE 16
31
static struct timeval period = {0, 500000};
32
static struct timeval tnext;
33

    
34
#ifdef HTTPIO_MHD
35
extern pthread_mutex_t cb_mutex;
36
#endif
37

    
38
void tout_init(struct timeval *tv)
39
{
40
  struct timeval tnow;
41

    
42
  if (tnext.tv_sec == 0) {
43
    gettimeofday(&tnext, NULL);
44
  }
45
  gettimeofday(&tnow, NULL);
46
  if(timercmp(&tnow, &tnext, <)) {
47
    timersub(&tnext, &tnow, tv);
48
  } else {
49
    *tv = (struct timeval){0, 0};
50
  }
51
}
52

    
53
void loop(struct nodeID *s, int csize, int buff_size)
54
{
55
  int done = 0;
56
  static uint8_t buff[BUFFSIZE];
57
  int cnt = 0;
58
  
59
  period.tv_sec = csize / 1000000;
60
  period.tv_usec = csize % 1000000;
61
  
62
  peers_init();
63
  stream_init(buff_size, s);
64
  update_peers(NULL, NULL, 0);
65
  while (!done) {
66
    int len, res;
67
    struct timeval tv;
68

    
69
    tout_init(&tv);
70
    res = wait4data(s, &tv, NULL);
71
    if (res > 0) {
72
      struct nodeID *remote;
73

    
74
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
75
      if (len < 0) {
76
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
77
        nodeid_free(remote);
78
        continue;
79
      }
80
      switch (buff[0] /* Message Type */) {
81
        case MSG_TYPE_TMAN:
82
        case MSG_TYPE_STREAMER_TOPOLOGY:
83
        case MSG_TYPE_TOPOLOGY:
84
          dtprintf("Topo message received:\n");
85
          update_peers(remote, buff, len);
86
          break;
87
        case MSG_TYPE_CHUNK:
88
          dtprintf("Chunk message received:\n");
89
          received_chunk(remote, buff, len);
90
          break;
91
        case MSG_TYPE_SIGNALLING:
92
          dtprintf("Sign message received:\n");
93
          sigParseData(remote, buff, len);
94
          break;
95
        default:
96
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
97
      }
98
      nodeid_free(remote);
99
    } else {
100
      struct timeval tmp;
101
      //send_chunk();
102
      send_offer();
103
      if (cnt++ % 10 == 0) {
104
        update_peers(NULL, NULL, 0);
105
      }
106
      timeradd(&tnext, &period, &tmp);
107
      tnext = tmp;
108
    }
109
  }
110
}
111

    
112
void source_loop(const char *fname, struct nodeID *s, int csize, int chunks, int buff_size)
113
{
114
  int done = 0;
115
  static uint8_t buff[BUFFSIZE];
116
  int cnt = 0;
117
  int fds[FDSSIZE];
118
  fds[0] = -1;
119

    
120
  period.tv_sec = csize  / 1000000;
121
  period.tv_usec = csize % 1000000;
122
  
123
  peers_init();
124

    
125
  if (source_init(fname, s, fds, FDSSIZE, buff_size) < 0) {
126
    fprintf(stderr,"Cannot initialize source, exiting");
127
    return;
128
  }
129
  while (!done) {
130
    int len, res;
131
    struct timeval tv, *ptv;
132
    int wait4fds[FDSSIZE], *pfds;
133

    
134
#ifdef HTTPIO
135
    memcpy(wait4fds, fds, sizeof(fds));
136
    res = wait4data(s, NULL, wait4fds);
137
#else
138
    if (fds[0] == -1) {
139
      tout_init(&tv);
140
      ptv = &tv;
141
      pfds = NULL;
142
    } else {
143
      memcpy(wait4fds, fds, sizeof(fds));
144
      pfds = wait4fds;
145
      ptv = NULL;
146
    }
147
    res = wait4data(s, ptv, pfds);
148
#endif
149
    if (res == 1) {
150
      struct nodeID *remote;
151

    
152
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
153
      if (len < 0) {
154
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
155
        nodeid_free(remote);
156
        continue;
157
      }
158
      dprintf("Received message (%c) from %s\n", buff[0], node_addr(remote));
159
      switch (buff[0] /* Message Type */) {
160
                case MSG_TYPE_TMAN:
161
        case MSG_TYPE_STREAMER_TOPOLOGY:
162
        case MSG_TYPE_TOPOLOGY:
163
          fprintf(stderr, "Top Parse\n");
164
#ifdef HTTPIO_MHD
165
          pthread_mutex_lock(&cb_mutex);
166
#endif
167
          update_peers(remote, buff, len);
168
#ifdef HTTPIO_MHD
169
          pthread_mutex_unlock(&cb_mutex);
170
#endif
171
          break;
172
        case MSG_TYPE_CHUNK:
173
          fprintf(stderr, "Some dumb peer pushed a chunk to me! peer:%s\n",node_addr(remote));
174
          break;
175
        case MSG_TYPE_SIGNALLING:
176
#ifdef HTTPIO_MHD
177
          pthread_mutex_lock(&cb_mutex);
178
#endif
179
          sigParseData(remote, buff, len);
180
#ifdef HTTPIO_MHD
181
          pthread_mutex_unlock(&cb_mutex);
182
#endif
183
          break;
184
        default:
185
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
186
      }
187
      nodeid_free(remote);
188
    } else if (res == 0 || res == 2) {        //timeout or data arrived from source
189
      int i;
190
      struct timeval tmp, d;
191
      struct chunk *c;
192

    
193
      d.tv_sec = 0;
194
      c = generated_chunk(&d.tv_usec);
195
      if (c) {
196
        add_chunk(c);
197
        for (i = 0; i < chunks; i++) {        // @TODO: why this cycle?
198
          send_chunk();
199
        }
200
        send_offer();
201
        if (cnt++ % 10 == 0) {
202
            update_peers(NULL, NULL, 0);
203
        }
204
      }
205
      timeradd(&tnext, &d, &tmp);
206
      tnext = tmp;
207
    }
208
  }
209
}