Statistics
| Branch: | Revision:

streamers / loop.c @ 5f0575fb

History | View | Annotate | Download (4.74 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/select.h>
8
#include <sys/time.h>
9
#include <time.h>
10
#include <stdint.h>
11
#include <stdlib.h>
12
#include <stdio.h>
13
#include <stdbool.h>
14
#include <string.h>
15

    
16
#include <net_helper.h>
17
#include <grapes_msg_types.h>
18
#include <peerset.h>
19
#include <peer.h>
20

    
21
#include "compatibility/timer.h"
22

    
23
#include "chunk_signaling.h"
24
#include "streaming.h"
25
#include "topology.h"
26
#include "loop.h"
27
#include "dbg.h"
28

    
29
#define BUFFSIZE 512 * 1024
30
#define FDSSIZE 16
31
static struct timeval period = {0, 500000};
32
static struct timeval tnext;
33
int fixed_offer_threads = 0;
34

    
35
void tout_init(struct timeval *tv)
36
{
37
  struct timeval tnow;
38

    
39
  if (tnext.tv_sec == 0) {
40
    gettimeofday(&tnext, NULL);
41
  }
42
  gettimeofday(&tnow, NULL);
43
  if(timercmp(&tnow, &tnext, <)) {
44
    timersub(&tnext, &tnow, tv);
45
  } else {
46
    *tv = (struct timeval){0, 0};
47
  }
48
}
49

    
50
void loop(struct nodeID *s, int polling_time, int buff_size, int offer_threads)
51
{
52
  int done = 0;
53
  static uint8_t buff[BUFFSIZE];
54
  int cnt = 0;
55
  
56
  period.tv_sec = polling_time / 1000000;
57
  period.tv_usec = polling_time % 1000000;
58
  
59
  peers_init();
60
  stream_init(buff_size, s);
61
  update_peers(NULL, NULL, 0);
62
  
63
          fixed_offer_threads = offer_threads;
64
   
65
   // Start all offer threads once forever
66
   send_offer();
67
  
68
  while (!done) {
69
    int len, res;
70
    struct timeval tv;
71

    
72
    tout_init(&tv);
73
    res = wait4data(s, &tv, NULL);
74
    if (res > 0) {
75
      struct nodeID *remote;
76

    
77
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
78
      if (len < 0) {
79
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
80
        nodeid_free(remote);
81
        continue;
82
      }
83
      switch (buff[0] /* Message Type */) {
84
        case MSG_TYPE_TMAN:
85
        case MSG_TYPE_STREAMER_TOPOLOGY:
86
        case MSG_TYPE_TOPOLOGY:
87
          dtprintf("Topo message received:\n");
88
          update_peers(remote, buff, len);
89
          break;
90
        case MSG_TYPE_CHUNK:
91
          dtprintf("Chunk message received:\n");
92
          received_chunk(remote, buff, len);
93
          break;
94
        case MSG_TYPE_SIGNALLING:
95
          dtprintf("Sign message received:\n");
96
          sigParseData(remote, buff, len);
97
          break;
98
        default:
99
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
100
      }
101
      nodeid_free(remote);
102
    } else {
103
      struct timeval tmp;
104
      send_offer();
105
      if (cnt++ % 10 == 0) {
106
        update_peers(NULL, NULL, 0);
107
      }
108
      timeradd(&tnext, &period, &tmp);
109
      tnext = tmp;
110
    }
111
  }
112
}
113

    
114
void source_loop(const char *fname, struct nodeID *s, int csize, int chunks, int buff_size)
115
{
116
  int done = 0;
117
  static uint8_t buff[BUFFSIZE];
118
  int cnt = 0;
119
  int fds[FDSSIZE];
120
  fds[0] = -1;
121

    
122
  period.tv_sec = csize  / 1000000;
123
  period.tv_usec = csize % 1000000;
124
  
125
  peers_init();
126

    
127
  if (source_init(fname, s, fds, FDSSIZE, buff_size) < 0) {
128
    fprintf(stderr,"Cannot initialize source, exiting");
129
    return;
130
  }
131
  while (!done) {
132
    int len, res;
133
    struct timeval tv, *ptv;
134
    int wait4fds[FDSSIZE], *pfds;
135

    
136
    if (fds[0] == -1) {
137
      tout_init(&tv);
138
      ptv = &tv;
139
      pfds = NULL;
140
    } else {
141
      memcpy(wait4fds, fds, sizeof(fds));
142
      pfds = wait4fds;
143
      ptv = NULL;
144
    }
145
    res = wait4data(s, ptv, pfds);
146

    
147
    if (res == 1) {
148
      struct nodeID *remote;
149

    
150
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
151
      if (len < 0) {
152
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
153
        nodeid_free(remote);
154
        continue;
155
      }
156
      dprintf("Received message (%d) from %s\n", buff[0], node_addr(remote));
157
      switch (buff[0] /* Message Type */) {
158
        case MSG_TYPE_TMAN:
159
        case MSG_TYPE_STREAMER_TOPOLOGY:
160
        case MSG_TYPE_TOPOLOGY:
161
          fprintf(stderr, "Top Parse\n");
162
          update_peers(remote, buff, len);
163
          break;
164
        case MSG_TYPE_CHUNK:
165
          fprintf(stderr, "Some dumb peer pushed a chunk to me! peer:%s\n",node_addr(remote));
166
          break;
167
        case MSG_TYPE_SIGNALLING:
168
          sigParseData(remote, buff, len);
169
          break;
170
        default:
171
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
172
      }
173
      nodeid_free(remote);
174
    } else if (res == 0 || res == 2) {        //timeout or data arrived from source
175
      int i;
176
      struct timeval tmp, d;
177
      struct chunk *c;
178

    
179
      d.tv_sec = 0;
180
      c = generated_chunk(&d.tv_usec);
181
      if (c) {
182
        add_chunk(c);
183
        for (i = 0; i < chunks; i++) {        // @TODO: why this cycle?
184
          send_chunk();
185
        }
186
        if (cnt++ % 10 == 0) {
187
            update_peers(NULL, NULL, 0);
188
        }
189
      }
190
      timeradd(&tnext, &d, &tmp);
191
      tnext = tmp;
192
    }
193
  }
194
}
195

    
196
int get_fixed_offer_threads(void) {
197
        return fixed_offer_threads;
198
        }