Statistics
| Branch: | Revision:

streamers / loop.c @ 018c744a

History | View | Annotate | Download (3.74 KB)

1 89e893e2 Luca
#include <sys/select.h>
2
#include <sys/time.h>
3
#include <time.h>
4
#include <stdint.h>
5
#include <stdlib.h>
6
#include <stdio.h>
7
8
#include <net_helper.h>
9
#include <topmanager.h>
10 615e8354 Luca Abeni
#include <msg_types.h>
11 3df216c8 Csaba Kiraly
#include <peerset.h>
12 018c744a Csaba Kiraly
#include <peer.h>
13 89e893e2 Luca
14 e223dc99 Csaba Kiraly
#include "chunk_signaling.h"
15 89e893e2 Luca
#include "streaming.h"
16
#include "loop.h"
17
18 0759159a Luca
#define BUFFSIZE 64 * 1024
19 89e893e2 Luca
static struct timeval period = {0, 500000};
20
static struct timeval tnext;
21 018c744a Csaba Kiraly
static struct timeval tout_bmap = {3, 0};
22 89e893e2 Luca
23 e223dc99 Csaba Kiraly
24 89e893e2 Luca
void tout_init(struct timeval *tv)
25
{
26
  struct timeval tnow;
27
28
  if (tnext.tv_sec == 0) {
29
    gettimeofday(&tnext, NULL);
30
  }
31
  gettimeofday(&tnow, NULL);
32
  if(timercmp(&tnow, &tnext, <)) {
33
    timersub(&tnext, &tnow, tv);
34
  } else {
35
    *tv = (struct timeval){0, 0};
36
  }
37
}
38
39 3df216c8 Csaba Kiraly
// currently it just makes the peerset grow
40
void update_peers(struct peerset *pset, const uint8_t *buff, int len)
41
{
42 018c744a Csaba Kiraly
  int n_ids, i;
43 3df216c8 Csaba Kiraly
  const struct nodeID **ids;
44 018c744a Csaba Kiraly
  struct peer *peers;
45
  struct timeval tnow, told;
46
47
48 3df216c8 Csaba Kiraly
  topParseData(buff, len);
49
  ids = topGetNeighbourhood(&n_ids);
50
  peerset_add_peers(pset,ids,n_ids);
51 018c744a Csaba Kiraly
52
  gettimeofday(&tnow, NULL);
53
  timersub(&tnow, &tout_bmap, &told);
54
  peers = peerset_get_peers(pset);
55
  for (i = 0; i < peerset_size(pset); i++) {
56
    if (timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <)) {
57
      peerset_remove_peer(pset, peers[i--].id);
58
    }
59
  }
60 3df216c8 Csaba Kiraly
}
61
62
63 0a40460a Luca
void loop(struct nodeID *s, int csize, int buff_size)
64 89e893e2 Luca
{
65
  int done = 0;
66
  static uint8_t buff[BUFFSIZE];
67
  int cnt = 0;
68 3df216c8 Csaba Kiraly
  struct peerset *pset = peerset_init(0);
69 89e893e2 Luca
  
70
  period.tv_sec = csize / 1000000;
71
  period.tv_usec = csize % 1000000;
72
  
73 e223dc99 Csaba Kiraly
  sigInit(s,pset);
74 3df216c8 Csaba Kiraly
  update_peers(pset, NULL, 0);
75 0a40460a Luca
  stream_init(buff_size, s);
76 89e893e2 Luca
  while (!done) {
77 4136911a Luca Abeni
    int len, res;
78
    struct timeval tv;
79 89e893e2 Luca
80 4136911a Luca Abeni
    tout_init(&tv);
81
    res = wait4data(s, tv);
82
    if (res > 0) {
83 89e893e2 Luca
      struct nodeID *remote;
84
85 4136911a Luca Abeni
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
86 89e893e2 Luca
      switch (buff[0] /* Message Type */) {
87 615e8354 Luca Abeni
        case MSG_TYPE_TOPOLOGY:
88 3df216c8 Csaba Kiraly
          update_peers(pset, buff, len);
89 89e893e2 Luca
          break;
90 615e8354 Luca Abeni
        case MSG_TYPE_CHUNK:
91 e98d8f50 Csaba Kiraly
          received_chunk(pset, remote, buff, len);
92 d2a239c6 Luca
          break;
93 e223dc99 Csaba Kiraly
        case MSG_TYPE_SIGNALLING:
94
          sigParseData(buff, len);
95
          break;
96 89e893e2 Luca
        default:
97
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
98
      }
99
      free(remote);
100
    } else {
101
      struct timeval tmp;
102
103 0f35d029 Csaba Kiraly
      send_chunk(pset);
104 89e893e2 Luca
      if (cnt++ % 10 == 0) {
105 3df216c8 Csaba Kiraly
        update_peers(pset,NULL, 0);
106 89e893e2 Luca
      }
107
      timeradd(&tnext, &period, &tmp);
108
      tnext = tmp;
109
    }
110
  }
111
}
112
113 7442ecb3 Luca
void source_loop(const char *fname, struct nodeID *s, int csize, int chunks)
114 89e893e2 Luca
{
115
  int done = 0;
116
  static uint8_t buff[BUFFSIZE];
117
  int cnt = 0;
118 3df216c8 Csaba Kiraly
  struct peerset *pset = peerset_init(0);
119 89e893e2 Luca
120
  period.tv_sec = csize  / 1000000;
121
  period.tv_usec = csize % 1000000;
122
  
123 e223dc99 Csaba Kiraly
  sigInit(s,pset);
124 7442ecb3 Luca
  source_init(fname, s);
125 89e893e2 Luca
  while (!done) {
126 4136911a Luca Abeni
    int len, res;
127
    struct timeval tv;
128 89e893e2 Luca
129 4136911a Luca Abeni
    tout_init(&tv);
130
    res = wait4data(s, tv);
131
    if (res > 0) {
132 89e893e2 Luca
      struct nodeID *remote;
133
134 4136911a Luca Abeni
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
135 89e893e2 Luca
      switch (buff[0] /* Message Type */) {
136 615e8354 Luca Abeni
        case MSG_TYPE_TOPOLOGY:
137 89e893e2 Luca
          fprintf(stderr, "Top Parse\n");
138 3df216c8 Csaba Kiraly
          update_peers(pset, buff, len);
139 89e893e2 Luca
          break;
140 615e8354 Luca Abeni
        case MSG_TYPE_CHUNK:
141 10796872 Luca Abeni
          fprintf(stderr, "Some dumb peer pushed a chunk to me!\n");
142
          break;
143 e223dc99 Csaba Kiraly
        case MSG_TYPE_SIGNALLING:
144
          sigParseData(buff, len);
145
          break;
146 89e893e2 Luca
        default:
147
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
148
      }
149
      free(remote);
150
    } else {
151 0f35d029 Csaba Kiraly
      int i;
152 89e893e2 Luca
      struct timeval tmp;
153
154
      generated_chunk();
155 0f35d029 Csaba Kiraly
      for (i = 0; i < chunks; i++) {        // @TODO: why this cycle?
156
        send_chunk(pset);
157 89e893e2 Luca
      }
158
      if (cnt++ % 10 == 0) {
159 3df216c8 Csaba Kiraly
          update_peers(pset, NULL, 0);
160 89e893e2 Luca
      }
161
      timeradd(&tnext, &period, &tmp);
162
      tnext = tmp;
163
    }
164
  }
165
}