Statistics
| Branch: | Revision:

streamers / loop.c @ 03dca3bf

History | View | Annotate | Download (4.14 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/select.h>
8
#include <sys/time.h>
9
#include <time.h>
10
#include <stdint.h>
11
#include <stdlib.h>
12
#include <stdio.h>
13
#include <stdbool.h>
14

    
15
#include <net_helper.h>
16

    
17
#include <msg_types.h>
18
#include <peerset.h>
19
#include <peer.h>
20

    
21
#include "chunk_signaling.h"
22
#include "topology.h"
23
#include "streaming.h"
24
#include "loop.h"
25
#include "dbg.h"
26

    
27
#define BUFFSIZE 512 * 1024
28
static struct timeval period = {0, 500000};
29
static struct timeval tnext;
30

    
31
#ifdef HTTPIO
32
extern pthread_mutex_t cb_mutex;
33
#endif
34

    
35
void tout_init(struct timeval *tv)
36
{
37
  struct timeval tnow;
38

    
39
  if (tnext.tv_sec == 0) {
40
    gettimeofday(&tnext, NULL);
41
  }
42
  gettimeofday(&tnow, NULL);
43
  if(timercmp(&tnow, &tnext, <)) {
44
    timersub(&tnext, &tnow, tv);
45
  } else {
46
    *tv = (struct timeval){0, 0};
47
  }
48
}
49

    
50
void loop(struct nodeID *s, int csize, int buff_size)
51
{
52
  int done = 0;
53
  static uint8_t buff[BUFFSIZE];
54
  int cnt = 0;
55
  
56
  period.tv_sec = csize / 1000000;
57
  period.tv_usec = csize % 1000000;
58
  
59
  sigInit(s);
60
  peers_init();
61
  stream_init(buff_size, s);
62
  update_peers(NULL, NULL, 0);
63
  while (!done) {
64
    int len, res;
65
    struct timeval tv;
66

    
67
    tout_init(&tv);
68
    res = wait4data(s, &tv, NULL);
69
    if (res > 0) {
70
      const struct nodeID *remote;
71

    
72
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
73
      if (len < 0) {
74
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
75
        nodeid_free(remote);
76
        continue;
77
      }
78
      switch (buff[0] /* Message Type */) {
79
        case MSG_TYPE_TOPOLOGY:
80
          update_peers(remote, buff, len);
81
          break;
82
        case MSG_TYPE_CHUNK:
83
          dprintf("Chunk message received:\n");
84
#ifdef HTTPIO
85
          pthread_mutex_lock(&cb_mutex);
86
#endif
87
          received_chunk(remote, buff, len);
88
#ifdef HTTPIO
89
          pthread_mutex_unlock(&cb_mutex);
90
#endif
91
          break;
92
        case MSG_TYPE_SIGNALLING:
93
          sigParseData(remote, buff, len);
94
          break;
95
        default:
96
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
97
      }
98
      nodeid_free(remote);
99
    } else {
100
      struct timeval tmp;
101

    
102
      //send_chunk();
103
      send_offer();
104
      if (cnt++ % 10 == 0) {
105
        update_peers(NULL, NULL, 0);
106
      }
107
      timeradd(&tnext, &period, &tmp);
108
      tnext = tmp;
109
    }
110
  }
111
}
112

    
113
void source_loop(const char *fname, struct nodeID *s, int csize, int chunks, bool loop)
114
{
115
  int done = 0;
116
  static uint8_t buff[BUFFSIZE];
117
  int cnt = 0;
118

    
119
  period.tv_sec = csize  / 1000000;
120
  period.tv_usec = csize % 1000000;
121
  
122
  sigInit(s);
123
  peers_init();
124
  if (source_init(fname, s, loop) < 0) {
125
    fprintf(stderr,"Cannot initialize source, exiting");
126
    return;
127
  }
128
  while (!done) {
129
    int len, res;
130
    struct timeval tv;
131

    
132
#ifdef HTTPIO
133
    res = wait4data(s, NULL, NULL);
134
#else
135
    tout_init(&tv);
136
    res = wait4data(s, &tv, NULL);
137
#endif
138
    if (res > 0) {
139
      const struct nodeID *remote;
140

    
141
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
142
      if (len < 0) {
143
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
144
        nodeid_free(remote);
145
        continue;
146
      }
147
      dprintf("Received message (%c) from %s\n", buff[0], node_addr(remote));
148
      switch (buff[0] /* Message Type */) {
149
        case MSG_TYPE_TOPOLOGY:
150
          fprintf(stderr, "Top Parse\n");
151
          update_peers(remote, buff, len);
152
          break;
153
        case MSG_TYPE_CHUNK:
154
          fprintf(stderr, "Some dumb peer pushed a chunk to me! peer:%s\n",node_addr(remote));
155
          break;
156
        case MSG_TYPE_SIGNALLING:
157
          sigParseData(remote, buff, len);
158
          break;
159
        default:
160
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
161
      }
162
      nodeid_free(remote);
163
    } else {
164
      int i, res;
165
      struct timeval tmp, d;
166

    
167
      d.tv_sec = 0;
168
      res = generated_chunk(&d.tv_usec);
169
      if (res) {
170
        for (i = 0; i < chunks; i++) {        // @TODO: why this cycle?
171
          send_chunk();
172
        }
173
        if (cnt++ % 10 == 0) {
174
            update_peers(NULL, NULL, 0);
175
        }
176
      }
177
      timeradd(&tnext, &d, &tmp);
178
      tnext = tmp;
179
    }
180
  }
181
}