Statistics
| Branch: | Revision:

streamers / loop.c @ c9370421

History | View | Annotate | Download (4.21 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/select.h>
8
#include <sys/time.h>
9
#include <time.h>
10
#include <stdint.h>
11
#include <stdlib.h>
12
#include <stdio.h>
13
#include <stdbool.h>
14

    
15
#include <net_helper.h>
16
#include <msg_types.h>
17
#include <peerset.h>
18
#include <peer.h>
19

    
20
#include "chunk_signaling.h"
21
#include "streaming.h"
22
#include "topology.h"
23
#include "loop.h"
24
#include "dbg.h"
25

    
26
#define BUFFSIZE 512 * 1024
27
#define FDSSIZE 16
28
static struct timeval period = {0, 500000};
29
static struct timeval tnext;
30

    
31
#ifdef HTTPIO
32
extern pthread_mutex_t cb_mutex;
33
#endif
34

    
35
void tout_init(struct timeval *tv)
36
{
37
  struct timeval tnow;
38

    
39
  if (tnext.tv_sec == 0) {
40
    gettimeofday(&tnext, NULL);
41
  }
42
  gettimeofday(&tnow, NULL);
43
  if(timercmp(&tnow, &tnext, <)) {
44
    timersub(&tnext, &tnow, tv);
45
  } else {
46
    *tv = (struct timeval){0, 0};
47
  }
48
}
49

    
50
void loop(struct nodeID *s, int csize, int buff_size)
51
{
52
  int done = 0;
53
  static uint8_t buff[BUFFSIZE];
54
  int cnt = 0;
55
  
56
  period.tv_sec = csize / 1000000;
57
  period.tv_usec = csize % 1000000;
58
  
59
  peers_init();
60
  stream_init(buff_size, s);
61
  update_peers(NULL, NULL, 0);
62
  while (!done) {
63
    int len, res;
64
    struct timeval tv;
65

    
66
    tout_init(&tv);
67
    res = wait4data(s, &tv, NULL);
68
    if (res > 0) {
69
      struct nodeID *remote;
70

    
71
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
72
      if (len < 0) {
73
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
74
        nodeid_free(remote);
75
        continue;
76
      }
77
      switch (buff[0] /* Message Type */) {
78
        case MSG_TYPE_TOPOLOGY:
79
          update_peers(remote, buff, len);
80
          break;
81
        case MSG_TYPE_CHUNK:
82
          dprintf("Chunk message received:\n");
83
#ifdef HTTPIO
84
          pthread_mutex_lock(&cb_mutex);
85
#endif
86
          received_chunk(remote, buff, len);
87
#ifdef HTTPIO
88
          pthread_mutex_unlock(&cb_mutex);
89
#endif
90
          break;
91
        case MSG_TYPE_SIGNALLING:
92
          sigParseData(remote, buff, len);
93
          break;
94
        default:
95
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
96
      }
97
      nodeid_free(remote);
98
    } else {
99
      struct timeval tmp;
100

    
101
      //send_chunk();
102
      send_offer();
103
      if (cnt++ % 10 == 0) {
104
        update_peers(NULL, NULL, 0);
105
      }
106
      timeradd(&tnext, &period, &tmp);
107
      tnext = tmp;
108
    }
109
  }
110
}
111

    
112
void source_loop(const char *fname, struct nodeID *s, int csize, int chunks, bool loop)
113
{
114
  int done = 0;
115
  static uint8_t buff[BUFFSIZE];
116
  int cnt = 0;
117
  int fds[FDSSIZE];
118
  fds[0] = -1;
119

    
120
  period.tv_sec = csize  / 1000000;
121
  period.tv_usec = csize % 1000000;
122
  
123
  peers_init();
124

    
125
  if (source_init(fname, s, loop, fds, FDSSIZE) < 0) {
126
    fprintf(stderr,"Cannot initialize source, exiting");
127
    return;
128
  }
129
  while (!done) {
130
    int len, res;
131
    struct timeval tv;
132

    
133
#ifdef HTTPIO
134
    res = wait4data(s, NULL, NULL);
135
#else
136
    tout_init(&tv);
137
    res = wait4data(s, &tv, NULL);
138
#endif
139
    if (res > 0) {
140
      struct nodeID *remote;
141

    
142
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
143
      if (len < 0) {
144
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
145
        nodeid_free(remote);
146
        continue;
147
      }
148
      dprintf("Received message (%c) from %s\n", buff[0], node_addr(remote));
149
      switch (buff[0] /* Message Type */) {
150
        case MSG_TYPE_TOPOLOGY:
151
          fprintf(stderr, "Top Parse\n");
152
          update_peers(remote, buff, len);
153
          break;
154
        case MSG_TYPE_CHUNK:
155
          fprintf(stderr, "Some dumb peer pushed a chunk to me! peer:%s\n",node_addr(remote));
156
          break;
157
        case MSG_TYPE_SIGNALLING:
158
          sigParseData(remote, buff, len);
159
          break;
160
        default:
161
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
162
      }
163
      nodeid_free(remote);
164
    } else {
165
      int i;
166
      struct timeval tmp, d;
167
      struct chunk *c;
168

    
169
      d.tv_sec = 0;
170
      c = generated_chunk(&d.tv_usec);
171
      if (c) {
172
        add_chunk(c);
173
        for (i = 0; i < chunks; i++) {        // @TODO: why this cycle?
174
          send_chunk();
175
        }
176
        if (cnt++ % 10 == 0) {
177
            update_peers(NULL, NULL, 0);
178
        }
179
      }
180
      timeradd(&tnext, &d, &tmp);
181
      tnext = tmp;
182
    }
183
  }
184
}