Statistics
| Branch: | Revision:

streamers / loop.c @ 6920fdab

History | View | Annotate | Download (2.86 KB)

1 89e893e2 Luca
#include <sys/select.h>
2
#include <sys/time.h>
3
#include <time.h>
4
#include <stdint.h>
5
#include <stdlib.h>
6
#include <stdio.h>
7
8
#include <net_helper.h>
9
#include <topmanager.h>
10
11
#include "streaming.h"
12
#include "loop.h"
13
14
static struct timeval period = {0, 500000};
15
static struct timeval tnext;
16
17
void tout_init(struct timeval *tv)
18
{
19
  struct timeval tnow;
20
21
  if (tnext.tv_sec == 0) {
22
    gettimeofday(&tnext, NULL);
23
  }
24
  gettimeofday(&tnow, NULL);
25
  if(timercmp(&tnow, &tnext, <)) {
26
    timersub(&tnext, &tnow, tv);
27
  } else {
28
    *tv = (struct timeval){0, 0};
29
  }
30
}
31
32
static int wait4data(struct nodeID *s)
33
{
34
  fd_set fds;
35
  int res;
36
  struct timeval tv;
37
  int fd = getFD(s);
38
39
  FD_ZERO(&fds);
40
  FD_SET(fd, &fds);
41
  tout_init(&tv);
42
  res = select(fd + 1, &fds, NULL, NULL, &tv);
43
  if (FD_ISSET(fd, &fds)) {
44
    return fd;
45
  }
46
47
  return -1;
48
}
49
50
void loop(struct nodeID *s, int csize)
51
{
52
  int done = 0;
53
#define BUFFSIZE 1024
54
  static uint8_t buff[BUFFSIZE];
55
  int cnt = 0;
56
  
57
  period.tv_sec = csize / 1000000;
58
  period.tv_usec = csize % 1000000;
59
  
60
  topParseData(NULL, 0);
61 6920fdab Luca
  stream_init(8, s);
62 89e893e2 Luca
  while (!done) {
63
    int len;
64
    int fd;
65
66
    fd = wait4data(s);
67
    if (fd > 0) {
68
      struct nodeID *remote;
69
70
      len = recv_data(s, &remote, buff, BUFFSIZE);
71
      switch (buff[0] /* Message Type */) {
72
        case 0x10 /* NCAST_PROTO */:
73
          topParseData(buff, len);
74
          break;
75
        case 12:
76
          received_chunk(buff, len);
77
        default:
78
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
79
      }
80
      free(remote);
81
    } else {
82
      const struct nodeID **neighbours;
83
      int n;
84
      struct timeval tmp;
85
86
      neighbours = topGetNeighbourhood(&n);
87
      send_chunk(neighbours, n);
88
      if (cnt++ % 10 == 0) {
89
        topParseData(NULL, 0);
90
      }
91
      timeradd(&tnext, &period, &tmp);
92
      tnext = tmp;
93
    }
94
  }
95
}
96
97
void source_loop(struct nodeID *s, int csize, int chunks)
98
{
99
  int done = 0;
100
#define BUFFSIZE 1024
101
  static uint8_t buff[BUFFSIZE];
102
  int cnt = 0;
103
104
  period.tv_sec = csize  / 1000000;
105
  period.tv_usec = csize % 1000000;
106
  
107 6920fdab Luca
  stream_init(1, s);
108 89e893e2 Luca
  while (!done) {
109
    int len;
110
    int fd;
111
112
    fd = wait4data(s);
113
    if (fd > 0) {
114
      struct nodeID *remote;
115
116
      len = recv_data(s, &remote, buff, BUFFSIZE);
117
      switch (buff[0] /* Message Type */) {
118
        case 0x10 /* NCAST_PROTO */:
119
          fprintf(stderr, "Top Parse\n");
120
          topParseData(buff, len);
121
          break;
122
        default:
123
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
124
      }
125
      free(remote);
126
    } else {
127
      const struct nodeID **neighbours;
128
      int i, n;
129
      struct timeval tmp;
130
131
      generated_chunk();
132
      neighbours = topGetNeighbourhood(&n);
133
      for (i = 0; i < chunks; i++) {
134
        send_chunk(neighbours, n);
135
      }
136
      if (cnt++ % 10 == 0) {
137
        topParseData(NULL, 0);
138
      }
139
      timeradd(&tnext, &period, &tmp);
140
      tnext = tmp;
141
    }
142
  }
143
}