Statistics
| Branch: | Revision:

streamers / loop.c @ a34ed273

History | View | Annotate | Download (4.72 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/select.h>
8
#include <sys/time.h>
9
#include <time.h>
10
#include <stdint.h>
11
#include <stdlib.h>
12
#include <stdio.h>
13
#include <stdbool.h>
14
#include <string.h>
15

    
16
#include <net_helper.h>
17
#include <grapes_msg_types.h>
18
#include <peerset.h>
19
#include <peer.h>
20

    
21
#include "chunk_signaling.h"
22
#include "streaming.h"
23
#include "topology.h"
24
#include "loop.h"
25
#include "dbg.h"
26

    
27
#define BUFFSIZE 512 * 1024
28
#define FDSSIZE 16
29
static struct timeval period = {0, 500000};
30
static struct timeval tnext;
31

    
32
#ifdef HTTPIO_MHD
33
extern pthread_mutex_t cb_mutex;
34
#endif
35

    
36
void tout_init(struct timeval *tv)
37
{
38
  struct timeval tnow;
39

    
40
  if (tnext.tv_sec == 0) {
41
    gettimeofday(&tnext, NULL);
42
  }
43
  gettimeofday(&tnow, NULL);
44
  if(timercmp(&tnow, &tnext, <)) {
45
    timersub(&tnext, &tnow, tv);
46
  } else {
47
    *tv = (struct timeval){0, 0};
48
  }
49
}
50

    
51
void loop(struct nodeID *s, int csize, int buff_size)
52
{
53
  int done = 0;
54
  static uint8_t buff[BUFFSIZE];
55
  int cnt = 0;
56
  
57
  period.tv_sec = csize / 1000000;
58
  period.tv_usec = csize % 1000000;
59
  
60
  peers_init();
61
  stream_init(buff_size, s);
62
  update_peers(NULL, NULL, 0);
63
  while (!done) {
64
    int len, res;
65
    struct timeval tv;
66

    
67
    tout_init(&tv);
68
    res = wait4data(s, &tv, NULL);
69
    if (res > 0) {
70
      struct nodeID *remote;
71

    
72
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
73
      if (len < 0) {
74
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
75
        nodeid_free(remote);
76
        continue;
77
      }
78
      switch (buff[0] /* Message Type */) {
79
                case MSG_TYPE_TMAN:
80
        case MSG_TYPE_TOPOLOGY:
81
          update_peers(remote, buff, len);
82
          break;
83
        case MSG_TYPE_CHUNK:
84
          dprintf("Chunk message received:\n");
85
          received_chunk(remote, buff, len);
86
          break;
87
        case MSG_TYPE_SIGNALLING:
88
          sigParseData(remote, buff, len);
89
          break;
90
        default:
91
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
92
      }
93
      nodeid_free(remote);
94
    } else {
95
      struct timeval tmp;
96
      //send_chunk();
97
      send_offer();
98
      if (cnt++ % 10 == 0) {
99
        update_peers(NULL, NULL, 0);
100
      }
101
      timeradd(&tnext, &period, &tmp);
102
      tnext = tmp;
103
    }
104
  }
105
}
106

    
107
void source_loop(const char *fname, struct nodeID *s, int csize, int chunks, bool loop)
108
{
109
  int done = 0;
110
  static uint8_t buff[BUFFSIZE];
111
  int cnt = 0;
112
  int fds[FDSSIZE];
113
  fds[0] = -1;
114

    
115
  period.tv_sec = csize  / 1000000;
116
  period.tv_usec = csize % 1000000;
117
  
118
  peers_init();
119

    
120
  if (source_init(fname, s, loop, fds, FDSSIZE) < 0) {
121
    fprintf(stderr,"Cannot initialize source, exiting");
122
    return;
123
  }
124
  while (!done) {
125
    int len, res;
126
    struct timeval tv, *ptv;
127
    int wait4fds[FDSSIZE], *pfds;
128

    
129
#ifdef HTTPIO
130
    memcpy(wait4fds, fds, sizeof(fds));
131
    res = wait4data(s, NULL, wait4fds);
132
#else
133
    if (fds[0] == -1) {
134
      tout_init(&tv);
135
      ptv = &tv;
136
      pfds = NULL;
137
    } else {
138
      memcpy(wait4fds, fds, sizeof(fds));
139
      pfds = wait4fds;
140
      ptv = NULL;
141
    }
142
    res = wait4data(s, ptv, pfds);
143
#endif
144
    if (res == 1) {
145
      struct nodeID *remote;
146

    
147
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
148
      if (len < 0) {
149
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
150
        nodeid_free(remote);
151
        continue;
152
      }
153
      dprintf("Received message (%c) from %s\n", buff[0], node_addr(remote));
154
      switch (buff[0] /* Message Type */) {
155
                case MSG_TYPE_TMAN:
156
        case MSG_TYPE_TOPOLOGY:
157
          fprintf(stderr, "Top Parse\n");
158
#ifdef HTTPIO_MHD
159
          pthread_mutex_lock(&cb_mutex);
160
#endif
161
          update_peers(remote, buff, len);
162
#ifdef HTTPIO_MHD
163
          pthread_mutex_unlock(&cb_mutex);
164
#endif
165
          break;
166
        case MSG_TYPE_CHUNK:
167
          fprintf(stderr, "Some dumb peer pushed a chunk to me! peer:%s\n",node_addr(remote));
168
          break;
169
        case MSG_TYPE_SIGNALLING:
170
#ifdef HTTPIO_MHD
171
          pthread_mutex_lock(&cb_mutex);
172
#endif
173
          sigParseData(remote, buff, len);
174
#ifdef HTTPIO_MHD
175
          pthread_mutex_unlock(&cb_mutex);
176
#endif
177
          break;
178
        default:
179
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
180
      }
181
      nodeid_free(remote);
182
    } else if (res == 0 || res == 2) {        //timeout or data arrived from source
183
      int i;
184
      struct timeval tmp, d;
185
      struct chunk *c;
186

    
187
      d.tv_sec = 0;
188
      c = generated_chunk(&d.tv_usec);
189
      if (c) {
190
        add_chunk(c);
191
        for (i = 0; i < chunks; i++) {        // @TODO: why this cycle?
192
          send_chunk();
193
        }
194
        if (cnt++ % 10 == 0) {
195
            update_peers(NULL, NULL, 0);
196
        }
197
      }
198
      timeradd(&tnext, &d, &tmp);
199
      tnext = tmp;
200
    }
201
  }
202
}