Statistics
| Branch: | Revision:

streamers / loop.c @ e223dc99

History | View | Annotate | Download (3.33 KB)

1
#include <sys/select.h>
2
#include <sys/time.h>
3
#include <time.h>
4
#include <stdint.h>
5
#include <stdlib.h>
6
#include <stdio.h>
7

    
8
#include <net_helper.h>
9
#include <topmanager.h>
10
#include <msg_types.h>
11
#include <peerset.h>
12

    
13
#include "chunk_signaling.h"
14
#include "streaming.h"
15
#include "loop.h"
16

    
17
#define BUFFSIZE 64 * 1024
18
static struct timeval period = {0, 500000};
19
static struct timeval tnext;
20

    
21

    
22
void tout_init(struct timeval *tv)
23
{
24
  struct timeval tnow;
25

    
26
  if (tnext.tv_sec == 0) {
27
    gettimeofday(&tnext, NULL);
28
  }
29
  gettimeofday(&tnow, NULL);
30
  if(timercmp(&tnow, &tnext, <)) {
31
    timersub(&tnext, &tnow, tv);
32
  } else {
33
    *tv = (struct timeval){0, 0};
34
  }
35
}
36

    
37
// currently it just makes the peerset grow
38
void update_peers(struct peerset *pset, const uint8_t *buff, int len)
39
{
40
  int n_ids;
41
  const struct nodeID **ids;
42
  topParseData(buff, len);
43
  ids = topGetNeighbourhood(&n_ids);
44
  peerset_add_peers(pset,ids,n_ids);
45
}
46

    
47

    
48
void loop(struct nodeID *s, int csize, int buff_size)
49
{
50
  int done = 0;
51
  static uint8_t buff[BUFFSIZE];
52
  int cnt = 0;
53
  struct peerset *pset = peerset_init(0);
54
  
55
  period.tv_sec = csize / 1000000;
56
  period.tv_usec = csize % 1000000;
57
  
58
  sigInit(s,pset);
59
  update_peers(pset, NULL, 0);
60
  stream_init(buff_size, s);
61
  while (!done) {
62
    int len, res;
63
    struct timeval tv;
64

    
65
    tout_init(&tv);
66
    res = wait4data(s, tv);
67
    if (res > 0) {
68
      struct nodeID *remote;
69

    
70
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
71
      switch (buff[0] /* Message Type */) {
72
        case MSG_TYPE_TOPOLOGY:
73
          update_peers(pset, buff, len);
74
          break;
75
        case MSG_TYPE_CHUNK:
76
          received_chunk(pset, remote, buff, len);
77
          break;
78
        case MSG_TYPE_SIGNALLING:
79
          sigParseData(buff, len);
80
          break;
81
        default:
82
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
83
      }
84
      free(remote);
85
    } else {
86
      struct timeval tmp;
87

    
88
      send_chunk(pset);
89
      if (cnt++ % 10 == 0) {
90
        update_peers(pset,NULL, 0);
91
      }
92
      timeradd(&tnext, &period, &tmp);
93
      tnext = tmp;
94
    }
95
  }
96
}
97

    
98
void source_loop(const char *fname, struct nodeID *s, int csize, int chunks)
99
{
100
  int done = 0;
101
  static uint8_t buff[BUFFSIZE];
102
  int cnt = 0;
103
  struct peerset *pset = peerset_init(0);
104

    
105
  period.tv_sec = csize  / 1000000;
106
  period.tv_usec = csize % 1000000;
107
  
108
  sigInit(s,pset);
109
  source_init(fname, s);
110
  while (!done) {
111
    int len, res;
112
    struct timeval tv;
113

    
114
    tout_init(&tv);
115
    res = wait4data(s, tv);
116
    if (res > 0) {
117
      struct nodeID *remote;
118

    
119
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
120
      switch (buff[0] /* Message Type */) {
121
        case MSG_TYPE_TOPOLOGY:
122
          fprintf(stderr, "Top Parse\n");
123
          update_peers(pset, buff, len);
124
          break;
125
        case MSG_TYPE_CHUNK:
126
          fprintf(stderr, "Some dumb peer pushed a chunk to me!\n");
127
          break;
128
        case MSG_TYPE_SIGNALLING:
129
          sigParseData(buff, len);
130
          break;
131
        default:
132
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
133
      }
134
      free(remote);
135
    } else {
136
      int i;
137
      struct timeval tmp;
138

    
139
      generated_chunk();
140
      for (i = 0; i < chunks; i++) {        // @TODO: why this cycle?
141
        send_chunk(pset);
142
      }
143
      if (cnt++ % 10 == 0) {
144
          update_peers(pset, NULL, 0);
145
      }
146
      timeradd(&tnext, &period, &tmp);
147
      tnext = tmp;
148
    }
149
  }
150
}