Statistics
| Branch: | Revision:

streamers / loop.c @ 3df216c8

History | View | Annotate | Download (3.26 KB)

1
#include <sys/select.h>
2
#include <sys/time.h>
3
#include <time.h>
4
#include <stdint.h>
5
#include <stdlib.h>
6
#include <stdio.h>
7

    
8
#include <net_helper.h>
9
#include <topmanager.h>
10
#include <msg_types.h>
11
#include <peerset.h>
12

    
13
#include "streaming.h"
14
#include "loop.h"
15

    
16
#define BUFFSIZE 64 * 1024
17
static struct timeval period = {0, 500000};
18
static struct timeval tnext;
19

    
20
void tout_init(struct timeval *tv)
21
{
22
  struct timeval tnow;
23

    
24
  if (tnext.tv_sec == 0) {
25
    gettimeofday(&tnext, NULL);
26
  }
27
  gettimeofday(&tnow, NULL);
28
  if(timercmp(&tnow, &tnext, <)) {
29
    timersub(&tnext, &tnow, tv);
30
  } else {
31
    *tv = (struct timeval){0, 0};
32
  }
33
}
34

    
35
// currently it just makes the peerset grow
36
void update_peers(struct peerset *pset, const uint8_t *buff, int len)
37
{
38
  int n_ids;
39
  const struct nodeID **ids;
40
  topParseData(buff, len);
41
  ids = topGetNeighbourhood(&n_ids);
42
  peerset_add_peers(pset,ids,n_ids);
43
}
44

    
45

    
46
void loop(struct nodeID *s, int csize, int buff_size)
47
{
48
  int done = 0;
49
  static uint8_t buff[BUFFSIZE];
50
  int cnt = 0;
51
  struct peerset *pset = peerset_init(0);
52
  
53
  period.tv_sec = csize / 1000000;
54
  period.tv_usec = csize % 1000000;
55
  
56
  update_peers(pset, NULL, 0);
57
  stream_init(buff_size, s);
58
  while (!done) {
59
    int len, res;
60
    struct timeval tv;
61

    
62
    tout_init(&tv);
63
    res = wait4data(s, tv);
64
    if (res > 0) {
65
      struct nodeID *remote;
66

    
67
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
68
      switch (buff[0] /* Message Type */) {
69
        case MSG_TYPE_TOPOLOGY:
70
          update_peers(pset, buff, len);
71
          break;
72
        case MSG_TYPE_CHUNK:
73
          received_chunk(buff, len);
74
          break;
75
        default:
76
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
77
      }
78
      free(remote);
79
    } else {
80
      const struct nodeID **neighbours;
81
      int n;
82
      struct timeval tmp;
83

    
84
      neighbours = topGetNeighbourhood(&n);
85
      send_chunk(neighbours, n);
86
      if (cnt++ % 10 == 0) {
87
        update_peers(pset,NULL, 0);
88
      }
89
      timeradd(&tnext, &period, &tmp);
90
      tnext = tmp;
91
    }
92
  }
93
}
94

    
95
void source_loop(const char *fname, struct nodeID *s, int csize, int chunks)
96
{
97
  int done = 0;
98
  static uint8_t buff[BUFFSIZE];
99
  int cnt = 0;
100
  struct peerset *pset = peerset_init(0);
101

    
102
  period.tv_sec = csize  / 1000000;
103
  period.tv_usec = csize % 1000000;
104
  
105
  source_init(fname, s);
106
  while (!done) {
107
    int len, res;
108
    struct timeval tv;
109

    
110
    tout_init(&tv);
111
    res = wait4data(s, tv);
112
    if (res > 0) {
113
      struct nodeID *remote;
114

    
115
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
116
      switch (buff[0] /* Message Type */) {
117
        case MSG_TYPE_TOPOLOGY:
118
          fprintf(stderr, "Top Parse\n");
119
          update_peers(pset, buff, len);
120
          break;
121
        case MSG_TYPE_CHUNK:
122
          fprintf(stderr, "Some dumb peer pushed a chunk to me!\n");
123
          break;
124
        default:
125
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
126
      }
127
      free(remote);
128
    } else {
129
      const struct nodeID **neighbours;
130
      int i, n;
131
      struct timeval tmp;
132

    
133
      generated_chunk();
134
      neighbours = topGetNeighbourhood(&n);
135
      for (i = 0; i < chunks; i++) {
136
        send_chunk(neighbours, n);
137
      }
138
      if (cnt++ % 10 == 0) {
139
          update_peers(pset, NULL, 0);
140
      }
141
      timeradd(&tnext, &period, &tmp);
142
      tnext = tmp;
143
    }
144
  }
145
}