Statistics
| Branch: | Revision:

streamers / loop.c @ fbeb4f32

History | View | Annotate | Download (3.33 KB)

1
#include <sys/select.h>
2
#include <sys/time.h>
3
#include <time.h>
4
#include <stdint.h>
5
#include <stdlib.h>
6
#include <stdio.h>
7

    
8
#include <net_helper.h>
9
#include <msg_types.h>
10
#include <peerset.h>
11
#include <peer.h>
12

    
13
#include "chunk_signaling.h"
14
#include "streaming.h"
15
#include "topology.h"
16
#include "loop.h"
17
#include "dbg.h"
18

    
19
#define BUFFSIZE 64 * 1024
20
static struct timeval period = {0, 500000};
21
static struct timeval tnext;
22

    
23
void tout_init(struct timeval *tv)
24
{
25
  struct timeval tnow;
26

    
27
  if (tnext.tv_sec == 0) {
28
    gettimeofday(&tnext, NULL);
29
  }
30
  gettimeofday(&tnow, NULL);
31
  if(timercmp(&tnow, &tnext, <)) {
32
    timersub(&tnext, &tnow, tv);
33
  } else {
34
    *tv = (struct timeval){0, 0};
35
  }
36
}
37

    
38
void loop(struct nodeID *s, int csize, int buff_size)
39
{
40
  int done = 0;
41
  static uint8_t buff[BUFFSIZE];
42
  int cnt = 0;
43
  struct peerset *pset = peerset_init(0);
44
  
45
  period.tv_sec = csize / 1000000;
46
  period.tv_usec = csize % 1000000;
47
  
48
  sigInit(s,pset);
49
  update_peers(pset, NULL, NULL, 0);
50
  stream_init(buff_size, s);
51
  while (!done) {
52
    int len, res;
53
    struct timeval tv;
54

    
55
    tout_init(&tv);
56
    res = wait4data(s, tv);
57
    if (res > 0) {
58
      struct nodeID *remote;
59

    
60
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
61
      switch (buff[0] /* Message Type */) {
62
        case MSG_TYPE_TOPOLOGY:
63
          update_peers(pset, remote, buff, len);
64
          break;
65
        case MSG_TYPE_CHUNK:
66
          received_chunk(pset, remote, buff, len);
67
          break;
68
        case MSG_TYPE_SIGNALLING:
69
          sigParseData(remote, buff, len);
70
          break;
71
        default:
72
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
73
      }
74
      nodeID_free(remote);
75
    } else {
76
      struct timeval tmp;
77

    
78
      //send_chunk(pset);
79
      send_offer(pset);
80
      if (cnt++ % 10 == 0) {
81
        update_peers(pset, NULL, NULL, 0);
82
      }
83
      timeradd(&tnext, &period, &tmp);
84
      tnext = tmp;
85
    }
86
  }
87
}
88

    
89
void source_loop(const char *fname, struct nodeID *s, int csize, int chunks)
90
{
91
  int done = 0;
92
  static uint8_t buff[BUFFSIZE];
93
  int cnt = 0;
94
  struct peerset *pset = peerset_init(0);
95

    
96
  period.tv_sec = csize  / 1000000;
97
  period.tv_usec = csize % 1000000;
98
  
99
  sigInit(s,pset);
100
  source_init(fname, s);
101
  while (!done) {
102
    int len, res;
103
    struct timeval tv;
104

    
105
    tout_init(&tv);
106
    res = wait4data(s, tv);
107
    if (res > 0) {
108
      struct nodeID *remote;
109

    
110
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
111
      dprintf("Received message (%c) from %s\n", buff[0], node_addr(remote));
112
      switch (buff[0] /* Message Type */) {
113
        case MSG_TYPE_TOPOLOGY:
114
          fprintf(stderr, "Top Parse\n");
115
          update_peers(pset, remote, buff, len);
116
          break;
117
        case MSG_TYPE_CHUNK:
118
          fprintf(stderr, "Some dumb peer pushed a chunk to me!\n");
119
          break;
120
        case MSG_TYPE_SIGNALLING:
121
          sigParseData(remote, buff, len);
122
          break;
123
        default:
124
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
125
      }
126
      nodeID_free(remote);
127
    } else {
128
      int i, res;
129
      struct timeval tmp, d;
130

    
131
      res = generated_chunk(&d.tv_usec);
132
      if (res) {
133
        for (i = 0; i < chunks; i++) {        // @TODO: why this cycle?
134
          send_chunk(pset);
135
        }
136
        if (cnt++ % 10 == 0) {
137
            update_peers(pset, NULL, NULL, 0);
138
        }
139
      }
140
      timeradd(&tnext, &d, &tmp);
141
      tnext = tmp;
142
    }
143
  }
144
}