Statistics
| Branch: | Revision:

streamers / loop.c @ 74a5d4ae

History | View | Annotate | Download (4.13 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/select.h>
8
#include <sys/time.h>
9
#include <time.h>
10
#include <stdint.h>
11
#include <stdlib.h>
12
#include <stdio.h>
13
#include <stdbool.h>
14

    
15
#include <net_helper.h>
16
#include <msg_types.h>
17
#include <peerset.h>
18
#include <peer.h>
19

    
20
#include "chunk_signaling.h"
21
#include "streaming.h"
22
#include "topology.h"
23
#include "loop.h"
24
#include "dbg.h"
25

    
26
#define BUFFSIZE 512 * 1024
27
static struct timeval period = {0, 500000};
28
static struct timeval tnext;
29

    
30
#ifdef HTTPIO
31
extern pthread_mutex_t cb_mutex;
32
#endif
33

    
34
void tout_init(struct timeval *tv)
35
{
36
  struct timeval tnow;
37

    
38
  if (tnext.tv_sec == 0) {
39
    gettimeofday(&tnext, NULL);
40
  }
41
  gettimeofday(&tnow, NULL);
42
  if(timercmp(&tnow, &tnext, <)) {
43
    timersub(&tnext, &tnow, tv);
44
  } else {
45
    *tv = (struct timeval){0, 0};
46
  }
47
}
48

    
49
void loop(struct nodeID *s, int csize, int buff_size)
50
{
51
  int done = 0;
52
  static uint8_t buff[BUFFSIZE];
53
  int cnt = 0;
54
  
55
  period.tv_sec = csize / 1000000;
56
  period.tv_usec = csize % 1000000;
57
  
58
  sigInit(s);
59
  peers_init();
60
  stream_init(buff_size, s);
61
  update_peers(NULL, NULL, 0);
62
  while (!done) {
63
    int len, res;
64
    struct timeval tv;
65

    
66
    tout_init(&tv);
67
    res = wait4data(s, &tv, NULL);
68
    if (res > 0) {
69
      struct nodeID *remote;
70

    
71
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
72
      if (len < 0) {
73
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
74
        nodeid_free(remote);
75
        continue;
76
      }
77
      switch (buff[0] /* Message Type */) {
78
        case MSG_TYPE_TOPOLOGY:
79
          update_peers(remote, buff, len);
80
          break;
81
        case MSG_TYPE_CHUNK:
82
          dprintf("Chunk message received:\n");
83
#ifdef HTTPIO
84
          pthread_mutex_lock(&cb_mutex);
85
#endif
86
          received_chunk(remote, buff, len);
87
#ifdef HTTPIO
88
          pthread_mutex_unlock(&cb_mutex);
89
#endif
90
          break;
91
        case MSG_TYPE_SIGNALLING:
92
          sigParseData(remote, buff, len);
93
          break;
94
        default:
95
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
96
      }
97
      nodeid_free(remote);
98
    } else {
99
      struct timeval tmp;
100

    
101
      //send_chunk();
102
      send_offer();
103
      if (cnt++ % 10 == 0) {
104
        update_peers(NULL, NULL, 0);
105
      }
106
      timeradd(&tnext, &period, &tmp);
107
      tnext = tmp;
108
    }
109
  }
110
}
111

    
112
void source_loop(const char *fname, struct nodeID *s, int csize, int chunks, bool loop)
113
{
114
  int done = 0;
115
  static uint8_t buff[BUFFSIZE];
116
  int cnt = 0;
117

    
118
  period.tv_sec = csize  / 1000000;
119
  period.tv_usec = csize % 1000000;
120
  
121
  sigInit(s);
122
  peers_init();
123
  if (source_init(fname, s, loop) < 0) {
124
    fprintf(stderr,"Cannot initialize source, exiting");
125
    return;
126
  }
127
  while (!done) {
128
    int len, res;
129
    struct timeval tv;
130

    
131
#ifdef HTTPIO
132
    res = wait4data(s, NULL, NULL);
133
#else
134
    tout_init(&tv);
135
    res = wait4data(s, &tv, NULL);
136
#endif
137
    if (res > 0) {
138
      struct nodeID *remote;
139

    
140
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
141
      if (len < 0) {
142
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
143
        nodeid_free(remote);
144
        continue;
145
      }
146
      dprintf("Received message (%c) from %s\n", buff[0], node_addr(remote));
147
      switch (buff[0] /* Message Type */) {
148
        case MSG_TYPE_TOPOLOGY:
149
          fprintf(stderr, "Top Parse\n");
150
          update_peers(remote, buff, len);
151
          break;
152
        case MSG_TYPE_CHUNK:
153
          fprintf(stderr, "Some dumb peer pushed a chunk to me! peer:%s\n",node_addr(remote));
154
          break;
155
        case MSG_TYPE_SIGNALLING:
156
          sigParseData(remote, buff, len);
157
          break;
158
        default:
159
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
160
      }
161
      nodeid_free(remote);
162
    } else {
163
      int i, res;
164
      struct timeval tmp, d;
165

    
166
      d.tv_sec = 0;
167
      res = generated_chunk(&d.tv_usec);
168
      if (res) {
169
        for (i = 0; i < chunks; i++) {        // @TODO: why this cycle?
170
          send_chunk();
171
        }
172
        if (cnt++ % 10 == 0) {
173
            update_peers(NULL, NULL, 0);
174
        }
175
      }
176
      timeradd(&tnext, &d, &tmp);
177
      tnext = tmp;
178
    }
179
  }
180
}