Statistics
| Branch: | Revision:

streamers / loop.c @ 03de31e0

History | View | Annotate | Download (4.97 KB)

1 8fed7779 CsabaKiraly
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7 89e893e2 Luca
#include <sys/select.h>
8
#include <sys/time.h>
9
#include <time.h>
10
#include <stdint.h>
11
#include <stdlib.h>
12
#include <stdio.h>
13 30a6e902 Csaba Kiraly
#include <stdbool.h>
14 c86d8685 CsabaKiraly
#include <string.h>
15 89e893e2 Luca
16
#include <net_helper.h>
17 e28a1487 CsabaKiraly
#include <grapes_msg_types.h>
18 3df216c8 Csaba Kiraly
#include <peerset.h>
19 018c744a Csaba Kiraly
#include <peer.h>
20 89e893e2 Luca
21 63ebb93d CsabaKiraly
#include "compatibility/timer.h"
22
23 e223dc99 Csaba Kiraly
#include "chunk_signaling.h"
24 ada339a0 ArpadBakay
#include "streaming.h"
25 74a5d4ae CsabaKiraly
#include "topology.h"
26 89e893e2 Luca
#include "loop.h"
27 ce80b058 Luca Abeni
#include "dbg.h"
28 89e893e2 Luca
29 625dbc20 Csaba Kiraly
#define BUFFSIZE 512 * 1024
30 c9370421 CsabaKiraly
#define FDSSIZE 16
31 89e893e2 Luca
static struct timeval period = {0, 500000};
32
static struct timeval tnext;
33 e223dc99 Csaba Kiraly
34 5805c339 GiuseppeTropea
#ifdef HTTPIO_MHD
35 03dca3bf ArpadBakay
extern pthread_mutex_t cb_mutex;
36
#endif
37
38 89e893e2 Luca
void tout_init(struct timeval *tv)
39
{
40
  struct timeval tnow;
41
42
  if (tnext.tv_sec == 0) {
43
    gettimeofday(&tnext, NULL);
44
  }
45
  gettimeofday(&tnow, NULL);
46
  if(timercmp(&tnow, &tnext, <)) {
47
    timersub(&tnext, &tnow, tv);
48
  } else {
49
    *tv = (struct timeval){0, 0};
50
  }
51
}
52
53 0a40460a Luca
void loop(struct nodeID *s, int csize, int buff_size)
54 89e893e2 Luca
{
55
  int done = 0;
56
  static uint8_t buff[BUFFSIZE];
57
  int cnt = 0;
58
  
59
  period.tv_sec = csize / 1000000;
60
  period.tv_usec = csize % 1000000;
61
  
62 fcb5c29b Csaba Kiraly
  peers_init();
63 0a40460a Luca
  stream_init(buff_size, s);
64 cf820c66 Csaba Kiraly
  update_peers(NULL, NULL, 0);
65 89e893e2 Luca
  while (!done) {
66 4136911a Luca Abeni
    int len, res;
67
    struct timeval tv;
68 89e893e2 Luca
69 4136911a Luca Abeni
    tout_init(&tv);
70 08495d05 Csaba Kiraly
    res = wait4data(s, &tv, NULL);
71 4136911a Luca Abeni
    if (res > 0) {
72 74a5d4ae CsabaKiraly
      struct nodeID *remote;
73 89e893e2 Luca
74 4136911a Luca Abeni
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
75 525bb486 Csaba Kiraly
      if (len < 0) {
76
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
77
        nodeid_free(remote);
78
        continue;
79
      }
80 89e893e2 Luca
      switch (buff[0] /* Message Type */) {
81 9ea73763 Csaba Kiraly
        case MSG_TYPE_TMAN:
82 9176d3d1 Csaba Kiraly
        case MSG_TYPE_STREAMER_TOPOLOGY:
83 615e8354 Luca Abeni
        case MSG_TYPE_TOPOLOGY:
84 7e65d1f7 Csaba Kiraly
          dtprintf("Topo message received:\n");
85 fcb5c29b Csaba Kiraly
          update_peers(remote, buff, len);
86 89e893e2 Luca
          break;
87 615e8354 Luca Abeni
        case MSG_TYPE_CHUNK:
88 7e65d1f7 Csaba Kiraly
          dtprintf("Chunk message received:\n");
89 fcb5c29b Csaba Kiraly
          received_chunk(remote, buff, len);
90 d2a239c6 Luca
          break;
91 e223dc99 Csaba Kiraly
        case MSG_TYPE_SIGNALLING:
92 7e65d1f7 Csaba Kiraly
          dtprintf("Sign message received:\n");
93 d310076d Csaba Kiraly
          sigParseData(remote, buff, len);
94 e223dc99 Csaba Kiraly
          break;
95 89e893e2 Luca
        default:
96
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
97
      }
98 a9d7dd3b Luca Abeni
      nodeid_free(remote);
99 89e893e2 Luca
    } else {
100
      struct timeval tmp;
101 fcb5c29b Csaba Kiraly
      //send_chunk();
102
      send_offer();
103 89e893e2 Luca
      if (cnt++ % 10 == 0) {
104 fcb5c29b Csaba Kiraly
        update_peers(NULL, NULL, 0);
105 89e893e2 Luca
      }
106
      timeradd(&tnext, &period, &tmp);
107
      tnext = tmp;
108
    }
109
  }
110
}
111
112 03de31e0 Csaba Kiraly
void source_loop(const char *fname, struct nodeID *s, int csize, int chunks, int buff_size)
113 89e893e2 Luca
{
114
  int done = 0;
115
  static uint8_t buff[BUFFSIZE];
116
  int cnt = 0;
117 c9370421 CsabaKiraly
  int fds[FDSSIZE];
118
  fds[0] = -1;
119 89e893e2 Luca
120
  period.tv_sec = csize  / 1000000;
121
  period.tv_usec = csize % 1000000;
122
  
123 fcb5c29b Csaba Kiraly
  peers_init();
124 c9370421 CsabaKiraly
125 03de31e0 Csaba Kiraly
  if (source_init(fname, s, fds, FDSSIZE, buff_size) < 0) {
126 54f4d42f Csaba Kiraly
    fprintf(stderr,"Cannot initialize source, exiting");
127
    return;
128
  }
129 89e893e2 Luca
  while (!done) {
130 4136911a Luca Abeni
    int len, res;
131 a34ed273 Luca Abeni
    struct timeval tv, *ptv;
132
    int wait4fds[FDSSIZE], *pfds;
133 89e893e2 Luca
134 03dca3bf ArpadBakay
#ifdef HTTPIO
135 c86d8685 CsabaKiraly
    memcpy(wait4fds, fds, sizeof(fds));
136
    res = wait4data(s, NULL, wait4fds);
137 03dca3bf ArpadBakay
#else
138 a34ed273 Luca Abeni
    if (fds[0] == -1) {
139
      tout_init(&tv);
140
      ptv = &tv;
141
      pfds = NULL;
142
    } else {
143
      memcpy(wait4fds, fds, sizeof(fds));
144
      pfds = wait4fds;
145
      ptv = NULL;
146
    }
147
    res = wait4data(s, ptv, pfds);
148 03dca3bf ArpadBakay
#endif
149 653129d8 CsabaKiraly
    if (res == 1) {
150 74a5d4ae CsabaKiraly
      struct nodeID *remote;
151 89e893e2 Luca
152 4136911a Luca Abeni
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
153 9d6a53c3 Csaba Kiraly
      if (len < 0) {
154
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
155
        nodeid_free(remote);
156
        continue;
157
      }
158 fbeb4f32 Csaba Kiraly
      dprintf("Received message (%c) from %s\n", buff[0], node_addr(remote));
159 89e893e2 Luca
      switch (buff[0] /* Message Type */) {
160 2323102b MarcoBiazzini
                case MSG_TYPE_TMAN:
161 9176d3d1 Csaba Kiraly
        case MSG_TYPE_STREAMER_TOPOLOGY:
162 615e8354 Luca Abeni
        case MSG_TYPE_TOPOLOGY:
163 89e893e2 Luca
          fprintf(stderr, "Top Parse\n");
164 5805c339 GiuseppeTropea
#ifdef HTTPIO_MHD
165 b8d7817f GiuseppeTropea
          pthread_mutex_lock(&cb_mutex);
166
#endif
167 fcb5c29b Csaba Kiraly
          update_peers(remote, buff, len);
168 5805c339 GiuseppeTropea
#ifdef HTTPIO_MHD
169 b8d7817f GiuseppeTropea
          pthread_mutex_unlock(&cb_mutex);
170
#endif
171 89e893e2 Luca
          break;
172 615e8354 Luca Abeni
        case MSG_TYPE_CHUNK:
173 b07667ee Csaba Kiraly
          fprintf(stderr, "Some dumb peer pushed a chunk to me! peer:%s\n",node_addr(remote));
174 10796872 Luca Abeni
          break;
175 e223dc99 Csaba Kiraly
        case MSG_TYPE_SIGNALLING:
176 5805c339 GiuseppeTropea
#ifdef HTTPIO_MHD
177 b8d7817f GiuseppeTropea
          pthread_mutex_lock(&cb_mutex);
178
#endif
179 d310076d Csaba Kiraly
          sigParseData(remote, buff, len);
180 5805c339 GiuseppeTropea
#ifdef HTTPIO_MHD
181 b8d7817f GiuseppeTropea
          pthread_mutex_unlock(&cb_mutex);
182
#endif
183 e223dc99 Csaba Kiraly
          break;
184 89e893e2 Luca
        default:
185
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
186
      }
187 a9d7dd3b Luca Abeni
      nodeid_free(remote);
188 653129d8 CsabaKiraly
    } else if (res == 0 || res == 2) {        //timeout or data arrived from source
189 7eee10e4 CsabaKiraly
      int i;
190 ce80b058 Luca Abeni
      struct timeval tmp, d;
191 7eee10e4 CsabaKiraly
      struct chunk *c;
192 89e893e2 Luca
193 7abbc9e7 Csaba Kiraly
      d.tv_sec = 0;
194 7eee10e4 CsabaKiraly
      c = generated_chunk(&d.tv_usec);
195
      if (c) {
196
        add_chunk(c);
197 6fe7eade Luca Abeni
        for (i = 0; i < chunks; i++) {        // @TODO: why this cycle?
198 fcb5c29b Csaba Kiraly
          send_chunk();
199 afdc8db4 Luca Abeni
        }
200 feaec543 CsabaKiraly
        send_offer();
201 afdc8db4 Luca Abeni
        if (cnt++ % 10 == 0) {
202 fcb5c29b Csaba Kiraly
            update_peers(NULL, NULL, 0);
203 afdc8db4 Luca Abeni
        }
204 89e893e2 Luca
      }
205 ce80b058 Luca Abeni
      timeradd(&tnext, &d, &tmp);
206 89e893e2 Luca
      tnext = tmp;
207
    }
208
  }
209
}