Statistics
| Branch: | Revision:

streamers / loop.c @ 5805c339

History | View | Annotate | Download (4.55 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/select.h>
8
#include <sys/time.h>
9
#include <time.h>
10
#include <stdint.h>
11
#include <stdlib.h>
12
#include <stdio.h>
13
#include <stdbool.h>
14
#include <string.h>
15

    
16
#include <net_helper.h>
17
#include <grapes_msg_types.h>
18
#include <peerset.h>
19
#include <peer.h>
20

    
21
#include "chunk_signaling.h"
22
#include "streaming.h"
23
#include "topology.h"
24
#include "loop.h"
25
#include "dbg.h"
26

    
27
#define BUFFSIZE 512 * 1024
28
#define FDSSIZE 16
29
static struct timeval period = {0, 500000};
30
static struct timeval tnext;
31

    
32
#ifdef HTTPIO_MHD
33
extern pthread_mutex_t cb_mutex;
34
#endif
35

    
36
void tout_init(struct timeval *tv)
37
{
38
  struct timeval tnow;
39

    
40
  if (tnext.tv_sec == 0) {
41
    gettimeofday(&tnext, NULL);
42
  }
43
  gettimeofday(&tnow, NULL);
44
  if(timercmp(&tnow, &tnext, <)) {
45
    timersub(&tnext, &tnow, tv);
46
  } else {
47
    *tv = (struct timeval){0, 0};
48
  }
49
}
50

    
51
void loop(struct nodeID *s, int csize, int buff_size)
52
{
53
  int done = 0;
54
  static uint8_t buff[BUFFSIZE];
55
  int cnt = 0;
56
  
57
  period.tv_sec = csize / 1000000;
58
  period.tv_usec = csize % 1000000;
59
  
60
  peers_init();
61
  stream_init(buff_size, s);
62
  update_peers(NULL, NULL, 0);
63
  while (!done) {
64
    int len, res;
65
    struct timeval tv;
66

    
67
    tout_init(&tv);
68
    res = wait4data(s, &tv, NULL);
69
    if (res > 0) {
70
      struct nodeID *remote;
71

    
72
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
73
      if (len < 0) {
74
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
75
        nodeid_free(remote);
76
        continue;
77
      }
78
      switch (buff[0] /* Message Type */) {
79
        case MSG_TYPE_TOPOLOGY:
80
          update_peers(remote, buff, len);
81
          break;
82
        case MSG_TYPE_CHUNK:
83
          dprintf("Chunk message received:\n");
84
          received_chunk(remote, buff, len);
85
          break;
86
        case MSG_TYPE_SIGNALLING:
87
          sigParseData(remote, buff, len);
88
          break;
89
        default:
90
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
91
      }
92
      nodeid_free(remote);
93
    } else {
94
      struct timeval tmp;
95
      //send_chunk();
96
      send_offer();
97
      if (cnt++ % 10 == 0) {
98
        update_peers(NULL, NULL, 0);
99
      }
100
      timeradd(&tnext, &period, &tmp);
101
      tnext = tmp;
102
    }
103
  }
104
}
105

    
106
void source_loop(const char *fname, struct nodeID *s, int csize, int chunks, bool loop)
107
{
108
  int done = 0;
109
  static uint8_t buff[BUFFSIZE];
110
  int cnt = 0;
111
  int fds[FDSSIZE];
112
  fds[0] = -1;
113

    
114
  period.tv_sec = csize  / 1000000;
115
  period.tv_usec = csize % 1000000;
116
  
117
  peers_init();
118

    
119
  if (source_init(fname, s, loop, fds, FDSSIZE) < 0) {
120
    fprintf(stderr,"Cannot initialize source, exiting");
121
    return;
122
  }
123
  while (!done) {
124
    int len, res;
125
    struct timeval tv;
126
    int wait4fds[FDSSIZE];
127

    
128
#ifdef HTTPIO
129
    memcpy(wait4fds, fds, sizeof(fds));
130
    res = wait4data(s, NULL, wait4fds);
131
#else
132
    tout_init(&tv);
133
    memcpy(wait4fds, fds, sizeof(fds));
134
    res = wait4data(s, &tv, wait4fds);
135
#endif
136
    if (res == 1) {
137
      struct nodeID *remote;
138

    
139
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
140
      if (len < 0) {
141
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
142
        nodeid_free(remote);
143
        continue;
144
      }
145
      dprintf("Received message (%c) from %s\n", buff[0], node_addr(remote));
146
      switch (buff[0] /* Message Type */) {
147
        case MSG_TYPE_TOPOLOGY:
148
          fprintf(stderr, "Top Parse\n");
149
#ifdef HTTPIO_MHD
150
          pthread_mutex_lock(&cb_mutex);
151
#endif
152
          update_peers(remote, buff, len);
153
#ifdef HTTPIO_MHD
154
          pthread_mutex_unlock(&cb_mutex);
155
#endif
156
          break;
157
        case MSG_TYPE_CHUNK:
158
          fprintf(stderr, "Some dumb peer pushed a chunk to me! peer:%s\n",node_addr(remote));
159
          break;
160
        case MSG_TYPE_SIGNALLING:
161
#ifdef HTTPIO_MHD
162
          pthread_mutex_lock(&cb_mutex);
163
#endif
164
          sigParseData(remote, buff, len);
165
#ifdef HTTPIO_MHD
166
          pthread_mutex_unlock(&cb_mutex);
167
#endif
168
          break;
169
        default:
170
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
171
      }
172
      nodeid_free(remote);
173
    } else if (res == 0 || res == 2) {        //timeout or data arrived from source
174
      int i;
175
      struct timeval tmp, d;
176
      struct chunk *c;
177

    
178
      d.tv_sec = 0;
179
      c = generated_chunk(&d.tv_usec);
180
      if (c) {
181
        add_chunk(c);
182
        for (i = 0; i < chunks; i++) {        // @TODO: why this cycle?
183
          send_chunk();
184
        }
185
        if (cnt++ % 10 == 0) {
186
            update_peers(NULL, NULL, 0);
187
        }
188
      }
189
      timeradd(&tnext, &d, &tmp);
190
      tnext = tmp;
191
    }
192
  }
193
}