streamers / loop.c @ 0a40460a
History | View | Annotate | Download (2.9 KB)
1 |
#include <sys/select.h> |
---|---|
2 |
#include <sys/time.h> |
3 |
#include <time.h> |
4 |
#include <stdint.h> |
5 |
#include <stdlib.h> |
6 |
#include <stdio.h> |
7 |
|
8 |
#include <net_helper.h> |
9 |
#include <topmanager.h> |
10 |
|
11 |
#include "streaming.h" |
12 |
#include "loop.h" |
13 |
|
14 |
static struct timeval period = {0, 500000}; |
15 |
static struct timeval tnext; |
16 |
|
17 |
void tout_init(struct timeval *tv) |
18 |
{ |
19 |
struct timeval tnow;
|
20 |
|
21 |
if (tnext.tv_sec == 0) { |
22 |
gettimeofday(&tnext, NULL);
|
23 |
} |
24 |
gettimeofday(&tnow, NULL);
|
25 |
if(timercmp(&tnow, &tnext, <)) {
|
26 |
timersub(&tnext, &tnow, tv); |
27 |
} else {
|
28 |
*tv = (struct timeval){0, 0}; |
29 |
} |
30 |
} |
31 |
|
32 |
static int wait4data(struct nodeID *s) |
33 |
{ |
34 |
fd_set fds; |
35 |
int res;
|
36 |
struct timeval tv;
|
37 |
int fd = getFD(s);
|
38 |
|
39 |
FD_ZERO(&fds); |
40 |
FD_SET(fd, &fds); |
41 |
tout_init(&tv); |
42 |
res = select(fd + 1, &fds, NULL, NULL, &tv); |
43 |
if (FD_ISSET(fd, &fds)) {
|
44 |
return fd;
|
45 |
} |
46 |
|
47 |
return -1; |
48 |
} |
49 |
|
50 |
void loop(struct nodeID *s, int csize, int buff_size) |
51 |
{ |
52 |
int done = 0; |
53 |
#define BUFFSIZE 1024 |
54 |
static uint8_t buff[BUFFSIZE];
|
55 |
int cnt = 0; |
56 |
|
57 |
period.tv_sec = csize / 1000000;
|
58 |
period.tv_usec = csize % 1000000;
|
59 |
|
60 |
topParseData(NULL, 0); |
61 |
stream_init(buff_size, s); |
62 |
while (!done) {
|
63 |
int len;
|
64 |
int fd;
|
65 |
|
66 |
fd = wait4data(s); |
67 |
if (fd > 0) { |
68 |
struct nodeID *remote;
|
69 |
|
70 |
len = recv_data(s, &remote, buff, BUFFSIZE); |
71 |
switch (buff[0] /* Message Type */) { |
72 |
case 0x10 /* NCAST_PROTO */: |
73 |
topParseData(buff, len); |
74 |
break;
|
75 |
case 12: |
76 |
received_chunk(buff, len); |
77 |
break;
|
78 |
default:
|
79 |
fprintf(stderr, "Unknown Message Type %x\n", buff[0]); |
80 |
} |
81 |
free(remote); |
82 |
} else {
|
83 |
const struct nodeID **neighbours; |
84 |
int n;
|
85 |
struct timeval tmp;
|
86 |
|
87 |
neighbours = topGetNeighbourhood(&n); |
88 |
send_chunk(neighbours, n); |
89 |
if (cnt++ % 10 == 0) { |
90 |
topParseData(NULL, 0); |
91 |
} |
92 |
timeradd(&tnext, &period, &tmp); |
93 |
tnext = tmp; |
94 |
} |
95 |
} |
96 |
} |
97 |
|
98 |
void source_loop(struct nodeID *s, int csize, int chunks) |
99 |
{ |
100 |
int done = 0; |
101 |
#define BUFFSIZE 1024 |
102 |
static uint8_t buff[BUFFSIZE];
|
103 |
int cnt = 0; |
104 |
|
105 |
period.tv_sec = csize / 1000000;
|
106 |
period.tv_usec = csize % 1000000;
|
107 |
|
108 |
stream_init(1, s);
|
109 |
while (!done) {
|
110 |
int len;
|
111 |
int fd;
|
112 |
|
113 |
fd = wait4data(s); |
114 |
if (fd > 0) { |
115 |
struct nodeID *remote;
|
116 |
|
117 |
len = recv_data(s, &remote, buff, BUFFSIZE); |
118 |
switch (buff[0] /* Message Type */) { |
119 |
case 0x10 /* NCAST_PROTO */: |
120 |
fprintf(stderr, "Top Parse\n");
|
121 |
topParseData(buff, len); |
122 |
break;
|
123 |
default:
|
124 |
fprintf(stderr, "Bad Message Type %x\n", buff[0]); |
125 |
} |
126 |
free(remote); |
127 |
} else {
|
128 |
const struct nodeID **neighbours; |
129 |
int i, n;
|
130 |
struct timeval tmp;
|
131 |
|
132 |
generated_chunk(); |
133 |
neighbours = topGetNeighbourhood(&n); |
134 |
for (i = 0; i < chunks; i++) { |
135 |
send_chunk(neighbours, n); |
136 |
} |
137 |
if (cnt++ % 10 == 0) { |
138 |
topParseData(NULL, 0); |
139 |
} |
140 |
timeradd(&tnext, &period, &tmp); |
141 |
tnext = tmp; |
142 |
} |
143 |
} |
144 |
} |