Statistics
| Branch: | Revision:

streamers / loop.c @ c45cea07

History | View | Annotate | Download (3.94 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/select.h>
8
#include <sys/time.h>
9
#include <time.h>
10
#include <stdint.h>
11
#include <stdlib.h>
12
#include <stdio.h>
13
#include <stdbool.h>
14

    
15
#include <net_helper.h>
16
#include <msg_types.h>
17
#include <peerset.h>
18
#include <peer.h>
19

    
20
#include "chunk_signaling.h"
21
#include "streaming.h"
22
#include "topology.h"
23
#include "loop.h"
24
#include "dbg.h"
25

    
26
#define BUFFSIZE 512 * 1024
27
static struct timeval period = {0, 500000};
28
static struct timeval tnext;
29

    
30
void tout_init(struct timeval *tv)
31
{
32
  struct timeval tnow;
33

    
34
  if (tnext.tv_sec == 0) {
35
    gettimeofday(&tnext, NULL);
36
  }
37
  gettimeofday(&tnow, NULL);
38
  if(timercmp(&tnow, &tnext, <)) {
39
    timersub(&tnext, &tnow, tv);
40
  } else {
41
    *tv = (struct timeval){0, 0};
42
  }
43
}
44

    
45
void loop(struct nodeID *s, int csize, int buff_size)
46
{
47
  int done = 0;
48
  static uint8_t buff[BUFFSIZE];
49
  int cnt = 0;
50
  
51
  period.tv_sec = csize / 1000000;
52
  period.tv_usec = csize % 1000000;
53
  
54
  sigInit(s);
55
  chunkSignalingInit(s);
56
  peers_init();
57
  stream_init(buff_size, s);
58
  update_peers(NULL, NULL, 0);
59
  while (!done) {
60
    int len, res;
61
    struct timeval tv;
62

    
63
    tout_init(&tv);
64
    res = wait4data(s, &tv, NULL);
65
    if (res > 0) {
66
      struct nodeID *remote;
67

    
68
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
69
      if (len < 0) {
70
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
71
        nodeid_free(remote);
72
        continue;
73
      }
74
      switch (buff[0] /* Message Type */) {
75
        case MSG_TYPE_TOPOLOGY:
76
          update_peers(remote, buff, len);
77
          break;
78
        case MSG_TYPE_CHUNK:
79
          dprintf("Chunk message received:\n");
80
          received_chunk(remote, buff, len);
81
          break;
82
        case MSG_TYPE_SIGNALLING:
83
          sigParseData(remote, buff, len);
84
          break;
85
        default:
86
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
87
      }
88
      nodeid_free(remote);
89
    } else {
90
      struct timeval tmp;
91

    
92
      //send_chunk();
93
      send_offer();
94
      if (cnt++ % 10 == 0) {
95
        update_peers(NULL, NULL, 0);
96
      }
97
      timeradd(&tnext, &period, &tmp);
98
      tnext = tmp;
99
    }
100
  }
101
}
102

    
103
void source_loop(const char *fname, struct nodeID *s, int csize, int chunks, bool loop)
104
{
105
  int done = 0;
106
  static uint8_t buff[BUFFSIZE];
107
  int cnt = 0;
108

    
109
  period.tv_sec = csize  / 1000000;
110
  period.tv_usec = csize % 1000000;
111
  
112
  sigInit(s);
113
  chunkSignalingInit(s);
114
  peers_init();
115
  if (source_init(fname, s, loop) < 0) {
116
    fprintf(stderr,"Cannot initialize source, exiting");
117
    return;
118
  }
119
  while (!done) {
120
    int len, res;
121
    struct timeval tv;
122

    
123
    tout_init(&tv);
124
    res = wait4data(s, &tv, NULL);
125
    if (res > 0) {
126
      struct nodeID *remote;
127

    
128
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
129
      if (len < 0) {
130
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
131
        nodeid_free(remote);
132
        continue;
133
      }
134
      dprintf("Received message (%c) from %s\n", buff[0], node_addr(remote));
135
      switch (buff[0] /* Message Type */) {
136
        case MSG_TYPE_TOPOLOGY:
137
          fprintf(stderr, "Top Parse\n");
138
          update_peers(remote, buff, len);
139
          break;
140
        case MSG_TYPE_CHUNK:
141
          fprintf(stderr, "Some dumb peer pushed a chunk to me! peer:%s\n",node_addr(remote));
142
          break;
143
        case MSG_TYPE_SIGNALLING:
144
          sigParseData(remote, buff, len);
145
          break;
146
        default:
147
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
148
      }
149
      nodeid_free(remote);
150
    } else {
151
      int i, res;
152
      struct timeval tmp, d;
153

    
154
      d.tv_sec = 0;
155
      res = generated_chunk(&d.tv_usec);
156
      if (res) {
157
        for (i = 0; i < chunks; i++) {        // @TODO: why this cycle?
158
          send_chunk();
159
        }
160
        if (cnt++ % 10 == 0) {
161
            update_peers(NULL, NULL, 0);
162
        }
163
      }
164
      timeradd(&tnext, &d, &tmp);
165
      tnext = tmp;
166
    }
167
  }
168
}