Statistics
| Branch: | Revision:

streamers / loop-mt.c @ da25233b

History | View | Annotate | Download (5.66 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/time.h>
8
#include <unistd.h>
9
#include <stdint.h>
10
#include <stdlib.h>
11
#include <stdio.h>
12
#include <pthread.h>
13
#include <net_helper.h>
14
#include <grapes_msg_types.h>
15
#include <peerset.h>
16
#include <peer.h>
17

    
18
#include "dbg.h"
19
#include "chunk_signaling.h"
20
#include "streaming.h"
21
#include "topology.h"
22
#include "loop.h"
23

    
24
#define BUFFSIZE 512 * 1024
25
#define FDSSIZE 16
26
static int chunks_per_period = 1;
27
static int period = 500000;
28
static int done;
29
pthread_mutex_t cb_mutex;
30
pthread_mutex_t topology_mutex;
31
static struct nodeID *s;
32

    
33
static void *chunk_forging(void *dummy)
34
{
35
  suseconds_t d;
36
  struct chunk *c;
37

    
38
  while(!done) {
39
    c = generated_chunk(&d);
40
    if (c) {
41
      pthread_mutex_lock(&cb_mutex);
42
      add_chunk(c);
43
      pthread_mutex_unlock(&cb_mutex);
44
    }
45
    usleep(d);
46
  }
47

    
48
  return NULL;
49
}
50

    
51
static void *source_receive(void *dummy)
52
{
53
  while (!done) {
54
    int len;
55
    struct nodeID *remote;
56
  static uint8_t buff[BUFFSIZE];
57

    
58
    len = recv_from_peer(s, &remote, buff, BUFFSIZE);
59
    if (len < 0) {
60
      fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
61
      nodeid_free(remote);
62
      continue;
63
    }
64
    switch (buff[0] /* Message Type */) {
65
          case MSG_TYPE_TMAN:
66
      case MSG_TYPE_TOPOLOGY:
67
        pthread_mutex_lock(&topology_mutex);
68
        update_peers(remote, buff, len);
69
        pthread_mutex_unlock(&topology_mutex);
70
        break;
71
      case MSG_TYPE_CHUNK:
72
        fprintf(stderr, "Some dumb peer pushed a chunk to me! peer:%s\n",node_addr(remote));
73
        break;
74
      case MSG_TYPE_SIGNALLING:
75
        pthread_mutex_lock(&topology_mutex);
76
        sigParseData(remote, buff, len);
77
        pthread_mutex_unlock(&topology_mutex);
78
        break;
79
      default:
80
        fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
81
    }
82
    nodeid_free(remote);
83
  }
84

    
85
  return NULL;
86
}
87

    
88
static void *receive(void *dummy)
89
{
90
  while (!done) {
91
    int len;
92
    struct nodeID *remote;
93
  static uint8_t buff[BUFFSIZE];
94

    
95
    len = recv_from_peer(s, &remote, buff, BUFFSIZE);
96
    if (len < 0) {
97
      fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
98
      nodeid_free(remote);
99
      continue;
100
    }
101
    dprintf("Received message (%c) from %s\n", buff[0], node_addr(remote));
102
    switch (buff[0] /* Message Type */) {
103
          case MSG_TYPE_TMAN:
104
      case MSG_TYPE_TOPOLOGY:
105
        pthread_mutex_lock(&topology_mutex);
106
        update_peers(remote, buff, len);
107
        pthread_mutex_unlock(&topology_mutex);
108
        break;
109
      case MSG_TYPE_CHUNK:
110
        dprintf("Chunk message received:\n");
111
        pthread_mutex_lock(&cb_mutex);
112
        received_chunk(remote, buff, len);
113
        pthread_mutex_unlock(&cb_mutex);
114
        break;
115
      case MSG_TYPE_SIGNALLING:
116
        pthread_mutex_lock(&topology_mutex);
117
        sigParseData(remote, buff, len);
118
        pthread_mutex_unlock(&topology_mutex);
119
        break;
120
      default:
121
        fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
122
    }
123
    nodeid_free(remote);
124
  }
125

    
126
  return NULL;
127
}
128

    
129
static void *topology_sending(void *dummy)
130
{
131
  int gossiping_period = period * 10;
132

    
133
  pthread_mutex_lock(&topology_mutex);
134
  update_peers(NULL, NULL, 0);
135
  pthread_mutex_unlock(&topology_mutex);
136
  while(!done) {
137
    pthread_mutex_lock(&topology_mutex);
138
    update_peers(NULL, NULL, 0);
139
    pthread_mutex_unlock(&topology_mutex);
140
    usleep(gossiping_period);
141
  }
142

    
143
  return NULL;
144
}
145

    
146
static void *chunk_sending(void *dummy)
147
{
148
  int chunk_period = period / chunks_per_period;
149

    
150
  while(!done) {
151
    pthread_mutex_lock(&topology_mutex);
152
    pthread_mutex_lock(&cb_mutex);
153
    send_chunk();
154
    pthread_mutex_unlock(&cb_mutex);
155
    pthread_mutex_unlock(&topology_mutex);
156
    usleep(chunk_period);
157
  }
158

    
159
  return NULL;
160
}
161

    
162
static void *chunk_trading(void *dummy)
163
{
164
  int chunk_period = period / chunks_per_period;
165

    
166
  while(!done) {
167
    pthread_mutex_lock(&topology_mutex);
168
    pthread_mutex_lock(&cb_mutex);
169
    send_offer();
170
    pthread_mutex_unlock(&cb_mutex);
171
    pthread_mutex_unlock(&topology_mutex);
172
    usleep(chunk_period);
173
  }
174

    
175
  return NULL;
176
}
177

    
178
void loop(struct nodeID *s1, int csize, int buff_size)
179
{
180
  pthread_t receive_thread, gossiping_thread, distributing_thread;
181
  
182
  period = csize;
183
  s = s1;
184
 
185
  peers_init();
186
  stream_init(buff_size, s);
187
  pthread_mutex_init(&cb_mutex, NULL);
188
  pthread_mutex_init(&topology_mutex, NULL);
189
  pthread_create(&receive_thread, NULL, receive, NULL); 
190
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
191
  pthread_create(&distributing_thread, NULL, chunk_trading, NULL); 
192

    
193
  pthread_join(receive_thread, NULL);
194
  pthread_join(gossiping_thread, NULL);
195
  pthread_join(distributing_thread, NULL);
196
}
197

    
198
void source_loop(const char *fname, struct nodeID *s1, int csize, int chunks, bool loop)
199
{
200
  pthread_t generate_thread, receive_thread, gossiping_thread, distributing_thread;
201
  
202
  period = csize;
203
  chunks_per_period = chunks;
204
  s = s1;
205
 
206
  int fds[FDSSIZE];
207
  fds[0] = -1;
208

    
209
//  sigInit(s);
210
  peers_init();
211
  if (source_init(fname, s, loop, fds, FDSSIZE) < 0) {
212
    fprintf(stderr,"Cannot initialize source, exiting");
213
    return;
214
  }
215
  pthread_mutex_init(&cb_mutex, NULL);
216
  pthread_mutex_init(&topology_mutex, NULL);
217
  pthread_create(&receive_thread, NULL, source_receive, NULL); 
218
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
219
#ifndef HTTPIO
220
  pthread_create(&distributing_thread, NULL, chunk_sending, NULL); 
221
  pthread_create(&generate_thread, NULL, chunk_forging, NULL); 
222
#endif
223

    
224
#ifndef HTTPIO
225
  pthread_join(generate_thread, NULL);
226
  pthread_join(distributing_thread, NULL);
227
#endif
228
  pthread_join(receive_thread, NULL);
229
  pthread_join(gossiping_thread, NULL);
230
}