Statistics
| Branch: | Revision:

streamers / loop.c @ d2a239c6

History | View | Annotate | Download (2.88 KB)

1
#include <sys/select.h>
2
#include <sys/time.h>
3
#include <time.h>
4
#include <stdint.h>
5
#include <stdlib.h>
6
#include <stdio.h>
7

    
8
#include <net_helper.h>
9
#include <topmanager.h>
10

    
11
#include "streaming.h"
12
#include "loop.h"
13

    
14
static struct timeval period = {0, 500000};
15
static struct timeval tnext;
16

    
17
void tout_init(struct timeval *tv)
18
{
19
  struct timeval tnow;
20

    
21
  if (tnext.tv_sec == 0) {
22
    gettimeofday(&tnext, NULL);
23
  }
24
  gettimeofday(&tnow, NULL);
25
  if(timercmp(&tnow, &tnext, <)) {
26
    timersub(&tnext, &tnow, tv);
27
  } else {
28
    *tv = (struct timeval){0, 0};
29
  }
30
}
31

    
32
static int wait4data(struct nodeID *s)
33
{
34
  fd_set fds;
35
  int res;
36
  struct timeval tv;
37
  int fd = getFD(s);
38

    
39
  FD_ZERO(&fds);
40
  FD_SET(fd, &fds);
41
  tout_init(&tv);
42
  res = select(fd + 1, &fds, NULL, NULL, &tv);
43
  if (FD_ISSET(fd, &fds)) {
44
    return fd;
45
  }
46

    
47
  return -1;
48
}
49

    
50
void loop(struct nodeID *s, int csize)
51
{
52
  int done = 0;
53
#define BUFFSIZE 1024
54
  static uint8_t buff[BUFFSIZE];
55
  int cnt = 0;
56
  
57
  period.tv_sec = csize / 1000000;
58
  period.tv_usec = csize % 1000000;
59
  
60
  topParseData(NULL, 0);
61
  stream_init(8, s);
62
  while (!done) {
63
    int len;
64
    int fd;
65

    
66
    fd = wait4data(s);
67
    if (fd > 0) {
68
      struct nodeID *remote;
69

    
70
      len = recv_data(s, &remote, buff, BUFFSIZE);
71
      switch (buff[0] /* Message Type */) {
72
        case 0x10 /* NCAST_PROTO */:
73
          topParseData(buff, len);
74
          break;
75
        case 12:
76
          received_chunk(buff, len);
77
          break;
78
        default:
79
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
80
      }
81
      free(remote);
82
    } else {
83
      const struct nodeID **neighbours;
84
      int n;
85
      struct timeval tmp;
86

    
87
      neighbours = topGetNeighbourhood(&n);
88
      send_chunk(neighbours, n);
89
      if (cnt++ % 10 == 0) {
90
        topParseData(NULL, 0);
91
      }
92
      timeradd(&tnext, &period, &tmp);
93
      tnext = tmp;
94
    }
95
  }
96
}
97

    
98
void source_loop(struct nodeID *s, int csize, int chunks)
99
{
100
  int done = 0;
101
#define BUFFSIZE 1024
102
  static uint8_t buff[BUFFSIZE];
103
  int cnt = 0;
104

    
105
  period.tv_sec = csize  / 1000000;
106
  period.tv_usec = csize % 1000000;
107
  
108
  stream_init(1, s);
109
  while (!done) {
110
    int len;
111
    int fd;
112

    
113
    fd = wait4data(s);
114
    if (fd > 0) {
115
      struct nodeID *remote;
116

    
117
      len = recv_data(s, &remote, buff, BUFFSIZE);
118
      switch (buff[0] /* Message Type */) {
119
        case 0x10 /* NCAST_PROTO */:
120
          fprintf(stderr, "Top Parse\n");
121
          topParseData(buff, len);
122
          break;
123
        default:
124
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
125
      }
126
      free(remote);
127
    } else {
128
      const struct nodeID **neighbours;
129
      int i, n;
130
      struct timeval tmp;
131

    
132
      generated_chunk();
133
      neighbours = topGetNeighbourhood(&n);
134
      for (i = 0; i < chunks; i++) {
135
        send_chunk(neighbours, n);
136
      }
137
      if (cnt++ % 10 == 0) {
138
        topParseData(NULL, 0);
139
      }
140
      timeradd(&tnext, &period, &tmp);
141
      tnext = tmp;
142
    }
143
  }
144
}