Statistics
| Branch: | Revision:

streamers / loop-mt.c @ 03de31e0

History | View | Annotate | Download (5.74 KB)

1 8fed7779 CsabaKiraly
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7 ee908119 Luca Abeni
#include <sys/time.h>
8 d92a24a6 Luca Abeni
#include <unistd.h>
9
#include <stdint.h>
10
#include <stdlib.h>
11
#include <stdio.h>
12
#include <pthread.h>
13
#include <net_helper.h>
14 e28a1487 CsabaKiraly
#include <grapes_msg_types.h>
15 0e858c17 Csaba Kiraly
#include <peerset.h>
16
#include <peer.h>
17 d92a24a6 Luca Abeni
18 cab9a9fb Csaba Kiraly
#include "dbg.h"
19 0e858c17 Csaba Kiraly
#include "chunk_signaling.h"
20 d92a24a6 Luca Abeni
#include "streaming.h"
21 0e858c17 Csaba Kiraly
#include "topology.h"
22 d92a24a6 Luca Abeni
#include "loop.h"
23
24 c386efc6 Csaba Kiraly
#define BUFFSIZE 512 * 1024
25 81b601c1 GiuseppeTropea
#define FDSSIZE 16
26 d92a24a6 Luca Abeni
static int chunks_per_period = 1;
27
static int period = 500000;
28
static int done;
29 81b601c1 GiuseppeTropea
pthread_mutex_t cb_mutex;
30
pthread_mutex_t topology_mutex;
31 d92a24a6 Luca Abeni
static struct nodeID *s;
32
33
static void *chunk_forging(void *dummy)
34
{
35 ee908119 Luca Abeni
  suseconds_t d;
36 7eee10e4 CsabaKiraly
  struct chunk *c;
37 d92a24a6 Luca Abeni
38
  while(!done) {
39 7eee10e4 CsabaKiraly
    c = generated_chunk(&d);
40
    if (c) {
41
      pthread_mutex_lock(&cb_mutex);
42
      add_chunk(c);
43
      pthread_mutex_unlock(&cb_mutex);
44
    }
45 ce80b058 Luca Abeni
    usleep(d);
46 d92a24a6 Luca Abeni
  }
47
48
  return NULL;
49
}
50
51
static void *source_receive(void *dummy)
52
{
53
  while (!done) {
54
    int len;
55
    struct nodeID *remote;
56
  static uint8_t buff[BUFFSIZE];
57
58 4136911a Luca Abeni
    len = recv_from_peer(s, &remote, buff, BUFFSIZE);
59 9d6a53c3 Csaba Kiraly
    if (len < 0) {
60
      fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
61
      nodeid_free(remote);
62
      continue;
63
    }
64 d92a24a6 Luca Abeni
    switch (buff[0] /* Message Type */) {
65 f0b37018 MarcoBiazzini
          case MSG_TYPE_TMAN:
66 9176d3d1 Csaba Kiraly
      case MSG_TYPE_STREAMER_TOPOLOGY:
67 615e8354 Luca Abeni
      case MSG_TYPE_TOPOLOGY:
68 d92a24a6 Luca Abeni
        pthread_mutex_lock(&topology_mutex);
69 fcb5c29b Csaba Kiraly
        update_peers(remote, buff, len);
70 d92a24a6 Luca Abeni
        pthread_mutex_unlock(&topology_mutex);
71
        break;
72 615e8354 Luca Abeni
      case MSG_TYPE_CHUNK:
73 b07667ee Csaba Kiraly
        fprintf(stderr, "Some dumb peer pushed a chunk to me! peer:%s\n",node_addr(remote));
74 10796872 Luca Abeni
        break;
75 7e3653fa Csaba Kiraly
      case MSG_TYPE_SIGNALLING:
76
        pthread_mutex_lock(&topology_mutex);
77
        sigParseData(remote, buff, len);
78
        pthread_mutex_unlock(&topology_mutex);
79
        break;
80 d92a24a6 Luca Abeni
      default:
81
        fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
82
    }
83 a9d7dd3b Luca Abeni
    nodeid_free(remote);
84 d92a24a6 Luca Abeni
  }
85
86
  return NULL;
87
}
88
89
static void *receive(void *dummy)
90
{
91
  while (!done) {
92
    int len;
93
    struct nodeID *remote;
94
  static uint8_t buff[BUFFSIZE];
95
96 4136911a Luca Abeni
    len = recv_from_peer(s, &remote, buff, BUFFSIZE);
97 9d6a53c3 Csaba Kiraly
    if (len < 0) {
98
      fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
99
      nodeid_free(remote);
100
      continue;
101
    }
102 cab9a9fb Csaba Kiraly
    dprintf("Received message (%c) from %s\n", buff[0], node_addr(remote));
103 d92a24a6 Luca Abeni
    switch (buff[0] /* Message Type */) {
104 f0b37018 MarcoBiazzini
          case MSG_TYPE_TMAN:
105 9176d3d1 Csaba Kiraly
      case MSG_TYPE_STREAMER_TOPOLOGY:
106 615e8354 Luca Abeni
      case MSG_TYPE_TOPOLOGY:
107 d92a24a6 Luca Abeni
        pthread_mutex_lock(&topology_mutex);
108 fcb5c29b Csaba Kiraly
        update_peers(remote, buff, len);
109 d92a24a6 Luca Abeni
        pthread_mutex_unlock(&topology_mutex);
110
        break;
111 615e8354 Luca Abeni
      case MSG_TYPE_CHUNK:
112 cab9a9fb Csaba Kiraly
        dprintf("Chunk message received:\n");
113 d92a24a6 Luca Abeni
        pthread_mutex_lock(&cb_mutex);
114 fcb5c29b Csaba Kiraly
        received_chunk(remote, buff, len);
115 d92a24a6 Luca Abeni
        pthread_mutex_unlock(&cb_mutex);
116 d2a239c6 Luca
        break;
117 0e858c17 Csaba Kiraly
      case MSG_TYPE_SIGNALLING:
118
        pthread_mutex_lock(&topology_mutex);
119 d310076d Csaba Kiraly
        sigParseData(remote, buff, len);
120 0e858c17 Csaba Kiraly
        pthread_mutex_unlock(&topology_mutex);
121
        break;
122 d92a24a6 Luca Abeni
      default:
123
        fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
124
    }
125 a9d7dd3b Luca Abeni
    nodeid_free(remote);
126 d92a24a6 Luca Abeni
  }
127
128
  return NULL;
129
}
130
131
static void *topology_sending(void *dummy)
132
{
133
  int gossiping_period = period * 10;
134
135
  pthread_mutex_lock(&topology_mutex);
136 fcb5c29b Csaba Kiraly
  update_peers(NULL, NULL, 0);
137 d92a24a6 Luca Abeni
  pthread_mutex_unlock(&topology_mutex);
138
  while(!done) {
139
    pthread_mutex_lock(&topology_mutex);
140 fcb5c29b Csaba Kiraly
    update_peers(NULL, NULL, 0);
141 d92a24a6 Luca Abeni
    pthread_mutex_unlock(&topology_mutex);
142 83c5c285 Luca Abeni
    usleep(gossiping_period);
143 d92a24a6 Luca Abeni
  }
144
145
  return NULL;
146
}
147
148
static void *chunk_sending(void *dummy)
149
{
150 83c5c285 Luca Abeni
  int chunk_period = period / chunks_per_period;
151 d92a24a6 Luca Abeni
152
  while(!done) {
153
    pthread_mutex_lock(&topology_mutex);
154
    pthread_mutex_lock(&cb_mutex);
155 fcb5c29b Csaba Kiraly
    send_chunk();
156 d92a24a6 Luca Abeni
    pthread_mutex_unlock(&cb_mutex);
157
    pthread_mutex_unlock(&topology_mutex);
158 83c5c285 Luca Abeni
    usleep(chunk_period);
159 d92a24a6 Luca Abeni
  }
160
161
  return NULL;
162
}
163
164 12a14c43 CsabaKiraly
static void *chunk_trading(void *dummy)
165
{
166
  int chunk_period = period / chunks_per_period;
167
168
  while(!done) {
169
    pthread_mutex_lock(&topology_mutex);
170
    pthread_mutex_lock(&cb_mutex);
171
    send_offer();
172
    pthread_mutex_unlock(&cb_mutex);
173
    pthread_mutex_unlock(&topology_mutex);
174
    usleep(chunk_period);
175
  }
176
177
  return NULL;
178
}
179
180 0a40460a Luca
void loop(struct nodeID *s1, int csize, int buff_size)
181 d92a24a6 Luca Abeni
{
182
  pthread_t receive_thread, gossiping_thread, distributing_thread;
183
  
184
  period = csize;
185
  s = s1;
186
 
187 9e72961f CsabaKiraly
  peers_init();
188 0a40460a Luca
  stream_init(buff_size, s);
189 d92a24a6 Luca Abeni
  pthread_mutex_init(&cb_mutex, NULL);
190
  pthread_mutex_init(&topology_mutex, NULL);
191
  pthread_create(&receive_thread, NULL, receive, NULL); 
192
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
193 12a14c43 CsabaKiraly
  pthread_create(&distributing_thread, NULL, chunk_trading, NULL); 
194 d92a24a6 Luca Abeni
195
  pthread_join(receive_thread, NULL);
196
  pthread_join(gossiping_thread, NULL);
197
  pthread_join(distributing_thread, NULL);
198
}
199
200 03de31e0 Csaba Kiraly
void source_loop(const char *fname, struct nodeID *s1, int csize, int chunks, int buff_size)
201 d92a24a6 Luca Abeni
{
202
  pthread_t generate_thread, receive_thread, gossiping_thread, distributing_thread;
203
  
204
  period = csize;
205
  chunks_per_period = chunks;
206
  s = s1;
207
 
208 81b601c1 GiuseppeTropea
  int fds[FDSSIZE];
209
  fds[0] = -1;
210
211
//  sigInit(s);
212 9e72961f CsabaKiraly
  peers_init();
213 03de31e0 Csaba Kiraly
  if (source_init(fname, s, fds, FDSSIZE, buff_size) < 0) {
214 54f4d42f Csaba Kiraly
    fprintf(stderr,"Cannot initialize source, exiting");
215
    return;
216
  }
217 d92a24a6 Luca Abeni
  pthread_mutex_init(&cb_mutex, NULL);
218
  pthread_mutex_init(&topology_mutex, NULL);
219
  pthread_create(&receive_thread, NULL, source_receive, NULL); 
220
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
221 81b601c1 GiuseppeTropea
#ifndef HTTPIO
222 d92a24a6 Luca Abeni
  pthread_create(&distributing_thread, NULL, chunk_sending, NULL); 
223 81b601c1 GiuseppeTropea
  pthread_create(&generate_thread, NULL, chunk_forging, NULL); 
224
#endif
225 d92a24a6 Luca Abeni
226 81b601c1 GiuseppeTropea
#ifndef HTTPIO
227 d92a24a6 Luca Abeni
  pthread_join(generate_thread, NULL);
228 81b601c1 GiuseppeTropea
  pthread_join(distributing_thread, NULL);
229
#endif
230 d92a24a6 Luca Abeni
  pthread_join(receive_thread, NULL);
231
  pthread_join(gossiping_thread, NULL);
232
}