Statistics
| Branch: | Revision:

streamers / loop.c @ 4136911a

History | View | Annotate | Download (2.87 KB)

1
#include <sys/select.h>
2
#include <sys/time.h>
3
#include <time.h>
4
#include <stdint.h>
5
#include <stdlib.h>
6
#include <stdio.h>
7

    
8
#include <net_helper.h>
9
#include <topmanager.h>
10
#include <msg_types.h>
11

    
12
#include "streaming.h"
13
#include "loop.h"
14

    
15
#define BUFFSIZE 64 * 1024
16
static struct timeval period = {0, 500000};
17
static struct timeval tnext;
18

    
19
void tout_init(struct timeval *tv)
20
{
21
  struct timeval tnow;
22

    
23
  if (tnext.tv_sec == 0) {
24
    gettimeofday(&tnext, NULL);
25
  }
26
  gettimeofday(&tnow, NULL);
27
  if(timercmp(&tnow, &tnext, <)) {
28
    timersub(&tnext, &tnow, tv);
29
  } else {
30
    *tv = (struct timeval){0, 0};
31
  }
32
}
33

    
34
void loop(struct nodeID *s, int csize, int buff_size)
35
{
36
  int done = 0;
37
  static uint8_t buff[BUFFSIZE];
38
  int cnt = 0;
39
  
40
  period.tv_sec = csize / 1000000;
41
  period.tv_usec = csize % 1000000;
42
  
43
  topParseData(NULL, 0);
44
  stream_init(buff_size, s);
45
  while (!done) {
46
    int len, res;
47
    struct timeval tv;
48

    
49
    tout_init(&tv);
50
    res = wait4data(s, tv);
51
    if (res > 0) {
52
      struct nodeID *remote;
53

    
54
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
55
      switch (buff[0] /* Message Type */) {
56
        case MSG_TYPE_TOPOLOGY:
57
          topParseData(buff, len);
58
          break;
59
        case MSG_TYPE_CHUNK:
60
          received_chunk(buff, len);
61
          break;
62
        default:
63
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
64
      }
65
      free(remote);
66
    } else {
67
      const struct nodeID **neighbours;
68
      int n;
69
      struct timeval tmp;
70

    
71
      neighbours = topGetNeighbourhood(&n);
72
      send_chunk(neighbours, n);
73
      if (cnt++ % 10 == 0) {
74
        topParseData(NULL, 0);
75
      }
76
      timeradd(&tnext, &period, &tmp);
77
      tnext = tmp;
78
    }
79
  }
80
}
81

    
82
void source_loop(const char *fname, struct nodeID *s, int csize, int chunks)
83
{
84
  int done = 0;
85
  static uint8_t buff[BUFFSIZE];
86
  int cnt = 0;
87

    
88
  period.tv_sec = csize  / 1000000;
89
  period.tv_usec = csize % 1000000;
90
  
91
  source_init(fname, s);
92
  while (!done) {
93
    int len, res;
94
    struct timeval tv;
95

    
96
    tout_init(&tv);
97
    res = wait4data(s, tv);
98
    if (res > 0) {
99
      struct nodeID *remote;
100

    
101
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
102
      switch (buff[0] /* Message Type */) {
103
        case MSG_TYPE_TOPOLOGY:
104
          fprintf(stderr, "Top Parse\n");
105
          topParseData(buff, len);
106
          break;
107
        case MSG_TYPE_CHUNK:
108
          fprintf(stderr, "Some dumb peer pushed a chunk to me!\n");
109
          break;
110
        default:
111
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
112
      }
113
      free(remote);
114
    } else {
115
      const struct nodeID **neighbours;
116
      int i, n;
117
      struct timeval tmp;
118

    
119
      generated_chunk();
120
      neighbours = topGetNeighbourhood(&n);
121
      for (i = 0; i < chunks; i++) {
122
        send_chunk(neighbours, n);
123
      }
124
      if (cnt++ % 10 == 0) {
125
        topParseData(NULL, 0);
126
      }
127
      timeradd(&tnext, &period, &tmp);
128
      tnext = tmp;
129
    }
130
  }
131
}