Statistics
| Branch: | Revision:

streamers / loop.c @ a9d7dd3b

History | View | Annotate | Download (2.97 KB)

1
#include <sys/select.h>
2
#include <sys/time.h>
3
#include <time.h>
4
#include <stdint.h>
5
#include <stdlib.h>
6
#include <stdio.h>
7

    
8
#include <net_helper.h>
9
#include <topmanager.h>
10
#include <msg_types.h>
11

    
12
#include "streaming.h"
13
#include "loop.h"
14
#include "dbg.h"
15

    
16
#define BUFFSIZE 64 * 1024
17
static struct timeval period = {0, 500000};
18
static struct timeval tnext;
19

    
20
void tout_init(struct timeval *tv)
21
{
22
  struct timeval tnow;
23

    
24
  if (tnext.tv_sec == 0) {
25
    gettimeofday(&tnext, NULL);
26
  }
27
  gettimeofday(&tnow, NULL);
28
  if(timercmp(&tnow, &tnext, <)) {
29
    timersub(&tnext, &tnow, tv);
30
  } else {
31
    *tv = (struct timeval){0, 0};
32
  }
33
}
34

    
35
void loop(struct nodeID *s, int csize, int buff_size)
36
{
37
  int done = 0;
38
  static uint8_t buff[BUFFSIZE];
39
  int cnt = 0;
40
  
41
  period.tv_sec = csize / 1000000;
42
  period.tv_usec = csize % 1000000;
43
  
44
  topParseData(NULL, 0);
45
  stream_init(buff_size, s);
46
  while (!done) {
47
    int len, res;
48
    struct timeval tv;
49

    
50
    tout_init(&tv);
51
    res = wait4data(s, tv);
52
    if (res > 0) {
53
      struct nodeID *remote;
54

    
55
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
56
      switch (buff[0] /* Message Type */) {
57
        case MSG_TYPE_TOPOLOGY:
58
          topParseData(buff, len);
59
          break;
60
        case MSG_TYPE_CHUNK:
61
          received_chunk(buff, len);
62
          break;
63
        default:
64
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
65
      }
66
      nodeid_free(remote);
67
    } else {
68
      const struct nodeID **neighbours;
69
      int n;
70
      struct timeval tmp;
71

    
72
      neighbours = topGetNeighbourhood(&n);
73
      send_chunk(neighbours, n);
74
      if (cnt++ % 10 == 0) {
75
        topParseData(NULL, 0);
76
      }
77
      timeradd(&tnext, &period, &tmp);
78
      tnext = tmp;
79
    }
80
  }
81
}
82

    
83
void source_loop(const char *fname, struct nodeID *s, int csize, int chunks)
84
{
85
  int done = 0;
86
  static uint8_t buff[BUFFSIZE];
87
  int cnt = 0;
88

    
89
  source_init(fname, s);
90
  while (!done) {
91
    int len, res;
92
    struct timeval tv;
93

    
94
    tout_init(&tv);
95
    res = wait4data(s, tv);
96
    if (res > 0) {
97
      struct nodeID *remote;
98

    
99
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
100
      switch (buff[0] /* Message Type */) {
101
        case MSG_TYPE_TOPOLOGY:
102
          fprintf(stderr, "Top Parse\n");
103
          topParseData(buff, len);
104
          break;
105
        case MSG_TYPE_CHUNK:
106
          fprintf(stderr, "Some dumb peer pushed a chunk to me!\n");
107
          break;
108
        default:
109
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
110
      }
111
      nodeid_free(remote);
112
    } else {
113
      const struct nodeID **neighbours;
114
      int i, n, res;
115
      struct timeval tmp, d;
116

    
117
      d.tv_sec = 0;
118
      res = generated_chunk(&d.tv_usec);
119
      if (res) {
120
        neighbours = topGetNeighbourhood(&n);
121
        for (i = 0; i < chunks; i++) {
122
dprintf(" T: %lld\n", tnext.tv_sec * 1000 + tnext.tv_usec / 1000);
123
          send_chunk(neighbours, n);
124
        }
125
        if (cnt++ % 10 == 0) {
126
          topParseData(NULL, 0);
127
        }
128
      }
129
      timeradd(&tnext, &d, &tmp);
130
      tnext = tmp;
131
    }
132
  }
133
}