Statistics
| Branch: | Revision:

streamers / loop.c @ bc6a6205

History | View | Annotate | Download (4.79 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/select.h>
8
#include <sys/time.h>
9
#include <time.h>
10
#include <stdint.h>
11
#include <stdlib.h>
12
#include <stdio.h>
13
#include <stdbool.h>
14
#include <string.h>
15

    
16
#include <net_helper.h>
17
#include <grapes_msg_types.h>
18
#include <peerset.h>
19
#include <peer.h>
20

    
21
#include "compatibility/timer.h"
22

    
23
#include "chunk_signaling.h"
24
#include "streaming.h"
25
#include "topology.h"
26
#include "loop.h"
27
#include "dbg.h"
28

    
29
#define BUFFSIZE 512 * 1024
30
#define FDSSIZE 16
31
static struct timeval period = {0, 500000};
32
static struct timeval tnext;
33
int fixed_offer_threads = 0;
34

    
35
void tout_init(struct timeval *tv)
36
{
37
  struct timeval tnow;
38

    
39
  if (tnext.tv_sec == 0) {
40
    gettimeofday(&tnext, NULL);
41
  }
42
  gettimeofday(&tnow, NULL);
43
  if(timercmp(&tnow, &tnext, <)) {
44
    timersub(&tnext, &tnow, tv);
45
  } else {
46
    *tv = (struct timeval){0, 0};
47
  }
48
}
49

    
50
void loop(struct nodeID *s, int polling_time, int buff_size, bool hrc, int offer_threads)
51
{
52
  int done = 0;
53
  static uint8_t buff[BUFFSIZE];
54
  int cnt = 0;
55
  
56
  period.tv_sec = polling_time / 1000000;
57
  period.tv_usec = polling_time % 1000000;
58
  
59
  peers_init();
60
  stream_init(buff_size, s);
61
  update_peers(NULL, NULL, 0);
62
  
63
  if (!hrc)
64
          fixed_offer_threads = offer_threads;
65
  else
66
        fixed_offer_threads = -1;  
67
  
68
  // Start all offer threads once forever
69
  send_offer();
70
  
71
  while (!done) {
72
    int len, res;
73
    struct timeval tv;
74

    
75
    tout_init(&tv);
76
    res = wait4data(s, &tv, NULL);
77
    if (res > 0) {
78
      struct nodeID *remote;
79

    
80
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
81
      if (len < 0) {
82
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
83
        nodeid_free(remote);
84
        continue;
85
      }
86
      switch (buff[0] /* Message Type */) {
87
        case MSG_TYPE_TMAN:
88
        case MSG_TYPE_STREAMER_TOPOLOGY:
89
        case MSG_TYPE_TOPOLOGY:
90
          dtprintf("Topo message received:\n");
91
          update_peers(remote, buff, len);
92
          break;
93
        case MSG_TYPE_CHUNK:
94
          dtprintf("Chunk message received:\n");
95
          received_chunk(remote, buff, len);
96
          break;
97
        case MSG_TYPE_SIGNALLING:
98
          dtprintf("Sign message received:\n");
99
          sigParseData(remote, buff, len);
100
          break;
101
        default:
102
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
103
      }
104
      nodeid_free(remote);
105
    } else {
106
      struct timeval tmp;
107
      send_offer();
108
      if (cnt++ % 10 == 0) {
109
        update_peers(NULL, NULL, 0);
110
      }
111
      timeradd(&tnext, &period, &tmp);
112
      tnext = tmp;
113
    }
114
  }
115
}
116

    
117
void source_loop(const char *fname, struct nodeID *s, int csize, int chunks, int buff_size)
118
{
119
  int done = 0;
120
  static uint8_t buff[BUFFSIZE];
121
  int cnt = 0;
122
  int fds[FDSSIZE];
123
  fds[0] = -1;
124

    
125
  period.tv_sec = csize  / 1000000;
126
  period.tv_usec = csize % 1000000;
127
  
128
  peers_init();
129

    
130
  if (source_init(fname, s, fds, FDSSIZE, buff_size) < 0) {
131
    fprintf(stderr,"Cannot initialize source, exiting");
132
    return;
133
  }
134
  while (!done) {
135
    int len, res;
136
    struct timeval tv, *ptv;
137
    int wait4fds[FDSSIZE], *pfds;
138

    
139
    if (fds[0] == -1) {
140
      tout_init(&tv);
141
      ptv = &tv;
142
      pfds = NULL;
143
    } else {
144
      memcpy(wait4fds, fds, sizeof(fds));
145
      pfds = wait4fds;
146
      ptv = NULL;
147
    }
148
    res = wait4data(s, ptv, pfds);
149

    
150
    if (res == 1) {
151
      struct nodeID *remote;
152

    
153
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
154
      if (len < 0) {
155
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
156
        nodeid_free(remote);
157
        continue;
158
      }
159
      dprintf("Received message (%d) from %s\n", buff[0], node_addr(remote));
160
      switch (buff[0] /* Message Type */) {
161
        case MSG_TYPE_TMAN:
162
        case MSG_TYPE_STREAMER_TOPOLOGY:
163
        case MSG_TYPE_TOPOLOGY:
164
          fprintf(stderr, "Top Parse\n");
165
          update_peers(remote, buff, len);
166
          break;
167
        case MSG_TYPE_CHUNK:
168
          fprintf(stderr, "Some dumb peer pushed a chunk to me! peer:%s\n",node_addr(remote));
169
          break;
170
        case MSG_TYPE_SIGNALLING:
171
          sigParseData(remote, buff, len);
172
          break;
173
        default:
174
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
175
      }
176
      nodeid_free(remote);
177
    } else if (res == 0 || res == 2) {        //timeout or data arrived from source
178
      int i;
179
      struct timeval tmp, d;
180
      struct chunk *c;
181

    
182
      d.tv_sec = 0;
183
      c = generated_chunk(&d.tv_usec);
184
      if (c) {
185
        add_chunk(c);
186
        for (i = 0; i < chunks; i++) {        // @TODO: why this cycle?
187
          send_chunk();
188
        }
189
        if (cnt++ % 10 == 0) {
190
            update_peers(NULL, NULL, 0);
191
        }
192
      }
193
      timeradd(&tnext, &d, &tmp);
194
      tnext = tmp;
195
    }
196
  }
197
}
198

    
199
int get_fixed_offer_threads(void) {
200
        return fixed_offer_threads;
201
        }