Statistics
| Branch: | Revision:

streamers / loop.c @ 63ebb93d

History | View | Annotate | Download (4.75 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/select.h>
8
#include <sys/time.h>
9
#include <time.h>
10
#include <stdint.h>
11
#include <stdlib.h>
12
#include <stdio.h>
13
#include <stdbool.h>
14
#include <string.h>
15

    
16
#include <net_helper.h>
17
#include <grapes_msg_types.h>
18
#include <peerset.h>
19
#include <peer.h>
20

    
21
#include "compatibility/timer.h"
22

    
23
#include "chunk_signaling.h"
24
#include "streaming.h"
25
#include "topology.h"
26
#include "loop.h"
27
#include "dbg.h"
28

    
29
#define BUFFSIZE 512 * 1024
30
#define FDSSIZE 16
31
static struct timeval period = {0, 500000};
32
static struct timeval tnext;
33

    
34
#ifdef HTTPIO_MHD
35
extern pthread_mutex_t cb_mutex;
36
#endif
37

    
38
void tout_init(struct timeval *tv)
39
{
40
  struct timeval tnow;
41

    
42
  if (tnext.tv_sec == 0) {
43
    gettimeofday(&tnext, NULL);
44
  }
45
  gettimeofday(&tnow, NULL);
46
  if(timercmp(&tnow, &tnext, <)) {
47
    timersub(&tnext, &tnow, tv);
48
  } else {
49
    *tv = (struct timeval){0, 0};
50
  }
51
}
52

    
53
void loop(struct nodeID *s, int csize, int buff_size)
54
{
55
  int done = 0;
56
  static uint8_t buff[BUFFSIZE];
57
  int cnt = 0;
58
  
59
  period.tv_sec = csize / 1000000;
60
  period.tv_usec = csize % 1000000;
61
  
62
  peers_init();
63
  stream_init(buff_size, s);
64
  update_peers(NULL, NULL, 0);
65
  while (!done) {
66
    int len, res;
67
    struct timeval tv;
68

    
69
    tout_init(&tv);
70
    res = wait4data(s, &tv, NULL);
71
    if (res > 0) {
72
      struct nodeID *remote;
73

    
74
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
75
      if (len < 0) {
76
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
77
        nodeid_free(remote);
78
        continue;
79
      }
80
      switch (buff[0] /* Message Type */) {
81
                case MSG_TYPE_TMAN:
82
        case MSG_TYPE_TOPOLOGY:
83
          update_peers(remote, buff, len);
84
          break;
85
        case MSG_TYPE_CHUNK:
86
          dprintf("Chunk message received:\n");
87
          received_chunk(remote, buff, len);
88
          break;
89
        case MSG_TYPE_SIGNALLING:
90
          sigParseData(remote, buff, len);
91
          break;
92
        default:
93
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
94
      }
95
      nodeid_free(remote);
96
    } else {
97
      struct timeval tmp;
98
      //send_chunk();
99
      send_offer();
100
      if (cnt++ % 10 == 0) {
101
        update_peers(NULL, NULL, 0);
102
      }
103
      timeradd(&tnext, &period, &tmp);
104
      tnext = tmp;
105
    }
106
  }
107
}
108

    
109
void source_loop(const char *fname, struct nodeID *s, int csize, int chunks, bool loop)
110
{
111
  int done = 0;
112
  static uint8_t buff[BUFFSIZE];
113
  int cnt = 0;
114
  int fds[FDSSIZE];
115
  fds[0] = -1;
116

    
117
  period.tv_sec = csize  / 1000000;
118
  period.tv_usec = csize % 1000000;
119
  
120
  peers_init();
121

    
122
  if (source_init(fname, s, loop, fds, FDSSIZE) < 0) {
123
    fprintf(stderr,"Cannot initialize source, exiting");
124
    return;
125
  }
126
  while (!done) {
127
    int len, res;
128
    struct timeval tv, *ptv;
129
    int wait4fds[FDSSIZE], *pfds;
130

    
131
#ifdef HTTPIO
132
    memcpy(wait4fds, fds, sizeof(fds));
133
    res = wait4data(s, NULL, wait4fds);
134
#else
135
    if (fds[0] == -1) {
136
      tout_init(&tv);
137
      ptv = &tv;
138
      pfds = NULL;
139
    } else {
140
      memcpy(wait4fds, fds, sizeof(fds));
141
      pfds = wait4fds;
142
      ptv = NULL;
143
    }
144
    res = wait4data(s, ptv, pfds);
145
#endif
146
    if (res == 1) {
147
      struct nodeID *remote;
148

    
149
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
150
      if (len < 0) {
151
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
152
        nodeid_free(remote);
153
        continue;
154
      }
155
      dprintf("Received message (%c) from %s\n", buff[0], node_addr(remote));
156
      switch (buff[0] /* Message Type */) {
157
                case MSG_TYPE_TMAN:
158
        case MSG_TYPE_TOPOLOGY:
159
          fprintf(stderr, "Top Parse\n");
160
#ifdef HTTPIO_MHD
161
          pthread_mutex_lock(&cb_mutex);
162
#endif
163
          update_peers(remote, buff, len);
164
#ifdef HTTPIO_MHD
165
          pthread_mutex_unlock(&cb_mutex);
166
#endif
167
          break;
168
        case MSG_TYPE_CHUNK:
169
          fprintf(stderr, "Some dumb peer pushed a chunk to me! peer:%s\n",node_addr(remote));
170
          break;
171
        case MSG_TYPE_SIGNALLING:
172
#ifdef HTTPIO_MHD
173
          pthread_mutex_lock(&cb_mutex);
174
#endif
175
          sigParseData(remote, buff, len);
176
#ifdef HTTPIO_MHD
177
          pthread_mutex_unlock(&cb_mutex);
178
#endif
179
          break;
180
        default:
181
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
182
      }
183
      nodeid_free(remote);
184
    } else if (res == 0 || res == 2) {        //timeout or data arrived from source
185
      int i;
186
      struct timeval tmp, d;
187
      struct chunk *c;
188

    
189
      d.tv_sec = 0;
190
      c = generated_chunk(&d.tv_usec);
191
      if (c) {
192
        add_chunk(c);
193
        for (i = 0; i < chunks; i++) {        // @TODO: why this cycle?
194
          send_chunk();
195
        }
196
        if (cnt++ % 10 == 0) {
197
            update_peers(NULL, NULL, 0);
198
        }
199
      }
200
      timeradd(&tnext, &d, &tmp);
201
      tnext = tmp;
202
    }
203
  }
204
}