Statistics
| Branch: | Revision:

streamers / loop.c @ 89e893e2

History | View | Annotate | Download (2.95 KB)

1
#include <sys/select.h>
2
#include <sys/time.h>
3
#include <time.h>
4
#include <stdint.h>
5
#include <stdlib.h>
6
#include <stdio.h>
7

    
8
#include <net_helper.h>
9
#include <topmanager.h>
10

    
11
#include "streaming.h"
12
#include "loop.h"
13

    
14
static struct timeval period = {0, 500000};
15
static struct timeval tnext;
16

    
17
void tout_init(struct timeval *tv)
18
{
19
  struct timeval tnow;
20

    
21
  if (tnext.tv_sec == 0) {
22
    gettimeofday(&tnext, NULL);
23
  }
24
  gettimeofday(&tnow, NULL);
25
  if(timercmp(&tnow, &tnext, <)) {
26
    timersub(&tnext, &tnow, tv);
27
  } else {
28
    *tv = (struct timeval){0, 0};
29
  }
30
}
31

    
32
static int wait4data(struct nodeID *s)
33
{
34
  fd_set fds;
35
  int res;
36
  struct timeval tv;
37
  int fd = getFD(s);
38

    
39
  FD_ZERO(&fds);
40
  FD_SET(fd, &fds);
41
  tout_init(&tv);
42
  res = select(fd + 1, &fds, NULL, NULL, &tv);
43
  if (FD_ISSET(fd, &fds)) {
44
    return fd;
45
  }
46

    
47
  return -1;
48
}
49

    
50
void loop(struct nodeID *s, int csize)
51
{
52
  int done = 0;
53
#define BUFFSIZE 1024
54
  static uint8_t buff[BUFFSIZE];
55
  int cnt = 0;
56
  
57
  period.tv_sec = csize / 1000000;
58
  period.tv_usec = csize % 1000000;
59
  
60
  topParseData(NULL, 0);
61
  stream_init(8);        // FIXME!
62
  while (!done) {
63
    int len;
64
    int fd;
65

    
66
    fd = wait4data(s);
67
    if (fd > 0) {
68
      struct nodeID *remote;
69

    
70
      len = recv_data(s, &remote, buff, BUFFSIZE);
71
      switch (buff[0] /* Message Type */) {
72
        case 0x10 /* NCAST_PROTO */:
73
          topParseData(buff, len);
74
          break;
75
        case 12:
76
          received_chunk(buff, len);
77
        default:
78
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
79
      }
80
      free(remote);
81
    } else {
82
      const struct nodeID **neighbours;
83
      int n;
84
      struct timeval tmp;
85

    
86
      neighbours = topGetNeighbourhood(&n);
87
      send_chunk(neighbours, n);
88
      if (cnt++ % 10 == 0) {
89
        topParseData(NULL, 0);
90
      }
91
      timeradd(&tnext, &period, &tmp);
92
      tnext = tmp;
93
    }
94
  }
95
}
96

    
97
void source_loop(struct nodeID *s, int csize, int chunks)
98
{
99
  int done = 0;
100
#define BUFFSIZE 1024
101
  static uint8_t buff[BUFFSIZE];
102
  int cnt = 0;
103

    
104
  period.tv_sec = csize  / 1000000;
105
  period.tv_usec = csize % 1000000;
106
  
107
  stream_init(1);
108
  while (!done) {
109
    int len;
110
    int fd;
111

    
112
    fd = wait4data(s);
113
    if (fd > 0) {
114
      struct nodeID *remote;
115

    
116
      len = recv_data(s, &remote, buff, BUFFSIZE);
117
      switch (buff[0] /* Message Type */) {
118
        case 0x10 /* NCAST_PROTO */:
119
          fprintf(stderr, "Top Parse\n");
120
          topParseData(buff, len);
121
          break;
122
        default:
123
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
124
      }
125
      free(remote);
126
    } else {
127
      const struct nodeID **neighbours;
128
      int i, n;
129
      struct timeval tmp;
130

    
131
      fprintf(stderr, "Generate Chunk\n");
132
      generated_chunk();
133
      neighbours = topGetNeighbourhood(&n);
134
      for (i = 0; i < chunks; i++) {
135
        fprintf(stderr, "Send Chunk\n");
136
        send_chunk(neighbours, n);
137
      }
138
      if (cnt++ % 10 == 0) {
139
        topParseData(NULL, 0);
140
      }
141
      timeradd(&tnext, &period, &tmp);
142
      tnext = tmp;
143
    }
144
  }
145
}