Statistics
| Branch: | Revision:

streamers / loop.c @ 018c744a

History | View | Annotate | Download (3.74 KB)

1
#include <sys/select.h>
2
#include <sys/time.h>
3
#include <time.h>
4
#include <stdint.h>
5
#include <stdlib.h>
6
#include <stdio.h>
7

    
8
#include <net_helper.h>
9
#include <topmanager.h>
10
#include <msg_types.h>
11
#include <peerset.h>
12
#include <peer.h>
13

    
14
#include "chunk_signaling.h"
15
#include "streaming.h"
16
#include "loop.h"
17

    
18
#define BUFFSIZE 64 * 1024
19
static struct timeval period = {0, 500000};
20
static struct timeval tnext;
21
static struct timeval tout_bmap = {3, 0};
22

    
23

    
24
void tout_init(struct timeval *tv)
25
{
26
  struct timeval tnow;
27

    
28
  if (tnext.tv_sec == 0) {
29
    gettimeofday(&tnext, NULL);
30
  }
31
  gettimeofday(&tnow, NULL);
32
  if(timercmp(&tnow, &tnext, <)) {
33
    timersub(&tnext, &tnow, tv);
34
  } else {
35
    *tv = (struct timeval){0, 0};
36
  }
37
}
38

    
39
// currently it just makes the peerset grow
40
void update_peers(struct peerset *pset, const uint8_t *buff, int len)
41
{
42
  int n_ids, i;
43
  const struct nodeID **ids;
44
  struct peer *peers;
45
  struct timeval tnow, told;
46

    
47

    
48
  topParseData(buff, len);
49
  ids = topGetNeighbourhood(&n_ids);
50
  peerset_add_peers(pset,ids,n_ids);
51

    
52
  gettimeofday(&tnow, NULL);
53
  timersub(&tnow, &tout_bmap, &told);
54
  peers = peerset_get_peers(pset);
55
  for (i = 0; i < peerset_size(pset); i++) {
56
    if (timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <)) {
57
      peerset_remove_peer(pset, peers[i--].id);
58
    }
59
  }
60
}
61

    
62

    
63
void loop(struct nodeID *s, int csize, int buff_size)
64
{
65
  int done = 0;
66
  static uint8_t buff[BUFFSIZE];
67
  int cnt = 0;
68
  struct peerset *pset = peerset_init(0);
69
  
70
  period.tv_sec = csize / 1000000;
71
  period.tv_usec = csize % 1000000;
72
  
73
  sigInit(s,pset);
74
  update_peers(pset, NULL, 0);
75
  stream_init(buff_size, s);
76
  while (!done) {
77
    int len, res;
78
    struct timeval tv;
79

    
80
    tout_init(&tv);
81
    res = wait4data(s, tv);
82
    if (res > 0) {
83
      struct nodeID *remote;
84

    
85
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
86
      switch (buff[0] /* Message Type */) {
87
        case MSG_TYPE_TOPOLOGY:
88
          update_peers(pset, buff, len);
89
          break;
90
        case MSG_TYPE_CHUNK:
91
          received_chunk(pset, remote, buff, len);
92
          break;
93
        case MSG_TYPE_SIGNALLING:
94
          sigParseData(buff, len);
95
          break;
96
        default:
97
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
98
      }
99
      free(remote);
100
    } else {
101
      struct timeval tmp;
102

    
103
      send_chunk(pset);
104
      if (cnt++ % 10 == 0) {
105
        update_peers(pset,NULL, 0);
106
      }
107
      timeradd(&tnext, &period, &tmp);
108
      tnext = tmp;
109
    }
110
  }
111
}
112

    
113
void source_loop(const char *fname, struct nodeID *s, int csize, int chunks)
114
{
115
  int done = 0;
116
  static uint8_t buff[BUFFSIZE];
117
  int cnt = 0;
118
  struct peerset *pset = peerset_init(0);
119

    
120
  period.tv_sec = csize  / 1000000;
121
  period.tv_usec = csize % 1000000;
122
  
123
  sigInit(s,pset);
124
  source_init(fname, s);
125
  while (!done) {
126
    int len, res;
127
    struct timeval tv;
128

    
129
    tout_init(&tv);
130
    res = wait4data(s, tv);
131
    if (res > 0) {
132
      struct nodeID *remote;
133

    
134
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
135
      switch (buff[0] /* Message Type */) {
136
        case MSG_TYPE_TOPOLOGY:
137
          fprintf(stderr, "Top Parse\n");
138
          update_peers(pset, buff, len);
139
          break;
140
        case MSG_TYPE_CHUNK:
141
          fprintf(stderr, "Some dumb peer pushed a chunk to me!\n");
142
          break;
143
        case MSG_TYPE_SIGNALLING:
144
          sigParseData(buff, len);
145
          break;
146
        default:
147
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
148
      }
149
      free(remote);
150
    } else {
151
      int i;
152
      struct timeval tmp;
153

    
154
      generated_chunk();
155
      for (i = 0; i < chunks; i++) {        // @TODO: why this cycle?
156
        send_chunk(pset);
157
      }
158
      if (cnt++ % 10 == 0) {
159
          update_peers(pset, NULL, 0);
160
      }
161
      timeradd(&tnext, &period, &tmp);
162
      tnext = tmp;
163
    }
164
  }
165
}