Statistics
| Branch: | Revision:

streamers / loop.c @ 2323102b

History | View | Annotate | Download (4.59 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/select.h>
8
#include <sys/time.h>
9
#include <time.h>
10
#include <stdint.h>
11
#include <stdlib.h>
12
#include <stdio.h>
13
#include <stdbool.h>
14
#include <string.h>
15

    
16
#include <net_helper.h>
17
#include <grapes_msg_types.h>
18
#include <peerset.h>
19
#include <peer.h>
20

    
21
#include "chunk_signaling.h"
22
#include "streaming.h"
23
#include "topology.h"
24
#include "loop.h"
25
#include "dbg.h"
26

    
27
#define BUFFSIZE 512 * 1024
28
#define FDSSIZE 16
29
static struct timeval period = {0, 500000};
30
static struct timeval tnext;
31

    
32
#ifdef HTTPIO_MHD
33
extern pthread_mutex_t cb_mutex;
34
#endif
35

    
36
void tout_init(struct timeval *tv)
37
{
38
  struct timeval tnow;
39

    
40
  if (tnext.tv_sec == 0) {
41
    gettimeofday(&tnext, NULL);
42
  }
43
  gettimeofday(&tnow, NULL);
44
  if(timercmp(&tnow, &tnext, <)) {
45
    timersub(&tnext, &tnow, tv);
46
  } else {
47
    *tv = (struct timeval){0, 0};
48
  }
49
}
50

    
51
void loop(struct nodeID *s, int csize, int buff_size)
52
{
53
  int done = 0;
54
  static uint8_t buff[BUFFSIZE];
55
  int cnt = 0;
56
  
57
  period.tv_sec = csize / 1000000;
58
  period.tv_usec = csize % 1000000;
59
  
60
  peers_init();
61
  stream_init(buff_size, s);
62
  update_peers(NULL, NULL, 0);
63
  while (!done) {
64
    int len, res;
65
    struct timeval tv;
66

    
67
    tout_init(&tv);
68
    res = wait4data(s, &tv, NULL);
69
    if (res > 0) {
70
      struct nodeID *remote;
71

    
72
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
73
      if (len < 0) {
74
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
75
        nodeid_free(remote);
76
        continue;
77
      }
78
      switch (buff[0] /* Message Type */) {
79
                case MSG_TYPE_TMAN:
80
        case MSG_TYPE_TOPOLOGY:
81
          update_peers(remote, buff, len);
82
          break;
83
        case MSG_TYPE_CHUNK:
84
          dprintf("Chunk message received:\n");
85
          received_chunk(remote, buff, len);
86
          break;
87
        case MSG_TYPE_SIGNALLING:
88
          sigParseData(remote, buff, len);
89
          break;
90
        default:
91
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
92
      }
93
      nodeid_free(remote);
94
    } else {
95
      struct timeval tmp;
96
      //send_chunk();
97
      send_offer();
98
      if (cnt++ % 10 == 0) {
99
        update_peers(NULL, NULL, 0);
100
      }
101
      timeradd(&tnext, &period, &tmp);
102
      tnext = tmp;
103
    }
104
  }
105
}
106

    
107
void source_loop(const char *fname, struct nodeID *s, int csize, int chunks, bool loop)
108
{
109
  int done = 0;
110
  static uint8_t buff[BUFFSIZE];
111
  int cnt = 0;
112
  int fds[FDSSIZE];
113
  fds[0] = -1;
114

    
115
  period.tv_sec = csize  / 1000000;
116
  period.tv_usec = csize % 1000000;
117
  
118
  peers_init();
119

    
120
  if (source_init(fname, s, loop, fds, FDSSIZE) < 0) {
121
    fprintf(stderr,"Cannot initialize source, exiting");
122
    return;
123
  }
124
  while (!done) {
125
    int len, res;
126
    struct timeval tv;
127
    int wait4fds[FDSSIZE];
128

    
129
#ifdef HTTPIO
130
    memcpy(wait4fds, fds, sizeof(fds));
131
    res = wait4data(s, NULL, wait4fds);
132
#else
133
    tout_init(&tv);
134
    memcpy(wait4fds, fds, sizeof(fds));
135
    res = wait4data(s, &tv, wait4fds);
136
#endif
137
    if (res == 1) {
138
      struct nodeID *remote;
139

    
140
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
141
      if (len < 0) {
142
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
143
        nodeid_free(remote);
144
        continue;
145
      }
146
      dprintf("Received message (%c) from %s\n", buff[0], node_addr(remote));
147
      switch (buff[0] /* Message Type */) {
148
                case MSG_TYPE_TMAN:
149
        case MSG_TYPE_TOPOLOGY:
150
          fprintf(stderr, "Top Parse\n");
151
#ifdef HTTPIO_MHD
152
          pthread_mutex_lock(&cb_mutex);
153
#endif
154
          update_peers(remote, buff, len);
155
#ifdef HTTPIO_MHD
156
          pthread_mutex_unlock(&cb_mutex);
157
#endif
158
          break;
159
        case MSG_TYPE_CHUNK:
160
          fprintf(stderr, "Some dumb peer pushed a chunk to me! peer:%s\n",node_addr(remote));
161
          break;
162
        case MSG_TYPE_SIGNALLING:
163
#ifdef HTTPIO_MHD
164
          pthread_mutex_lock(&cb_mutex);
165
#endif
166
          sigParseData(remote, buff, len);
167
#ifdef HTTPIO_MHD
168
          pthread_mutex_unlock(&cb_mutex);
169
#endif
170
          break;
171
        default:
172
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
173
      }
174
      nodeid_free(remote);
175
    } else if (res == 0 || res == 2) {        //timeout or data arrived from source
176
      int i;
177
      struct timeval tmp, d;
178
      struct chunk *c;
179

    
180
      d.tv_sec = 0;
181
      c = generated_chunk(&d.tv_usec);
182
      if (c) {
183
        add_chunk(c);
184
        for (i = 0; i < chunks; i++) {        // @TODO: why this cycle?
185
          send_chunk();
186
        }
187
        if (cnt++ % 10 == 0) {
188
            update_peers(NULL, NULL, 0);
189
        }
190
      }
191
      timeradd(&tnext, &d, &tmp);
192
      tnext = tmp;
193
    }
194
  }
195
}