Statistics
| Branch: | Revision:

streamers / loop-mt.c @ c45cea07

History | View | Annotate | Download (5.15 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/time.h>
8
#include <unistd.h>
9
#include <stdint.h>
10
#include <stdlib.h>
11
#include <stdio.h>
12
#include <pthread.h>
13
#include <net_helper.h>
14
#include <msg_types.h>
15
#include <peerset.h>
16
#include <peer.h>
17

    
18
#include "dbg.h"
19
#include "chunk_signaling.h"
20
#include "streaming.h"
21
#include "topology.h"
22
#include "loop.h"
23

    
24
#define BUFFSIZE 512 * 1024
25
static int chunks_per_period = 1;
26
static int period = 500000;
27
static int done;
28
static pthread_mutex_t cb_mutex;
29
static pthread_mutex_t topology_mutex;
30
static struct nodeID *s;
31

    
32
static void *chunk_forging(void *dummy)
33
{
34
  suseconds_t d;
35

    
36
  while(!done) {
37
    pthread_mutex_lock(&cb_mutex);
38
    generated_chunk(&d);
39
    pthread_mutex_unlock(&cb_mutex);
40
    usleep(d);
41
  }
42

    
43
  return NULL;
44
}
45

    
46
static void *source_receive(void *dummy)
47
{
48
  while (!done) {
49
    int len;
50
    struct nodeID *remote;
51
  static uint8_t buff[BUFFSIZE];
52

    
53
    len = recv_from_peer(s, &remote, buff, BUFFSIZE);
54
    if (len < 0) {
55
      fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
56
      nodeid_free(remote);
57
      continue;
58
    }
59
    switch (buff[0] /* Message Type */) {
60
      case MSG_TYPE_TOPOLOGY:
61
        pthread_mutex_lock(&topology_mutex);
62
        update_peers(remote, buff, len);
63
        pthread_mutex_unlock(&topology_mutex);
64
        break;
65
      case MSG_TYPE_CHUNK:
66
        fprintf(stderr, "Some dumb peer pushed a chunk to me! peer:%s\n",node_addr(remote));
67
        break;
68
      case MSG_TYPE_SIGNALLING:
69
        pthread_mutex_lock(&topology_mutex);
70
        sigParseData(remote, buff, len);
71
        pthread_mutex_unlock(&topology_mutex);
72
        break;
73
      default:
74
        fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
75
    }
76
    nodeid_free(remote);
77
  }
78

    
79
  return NULL;
80
}
81

    
82
static void *receive(void *dummy)
83
{
84
  while (!done) {
85
    int len;
86
    struct nodeID *remote;
87
  static uint8_t buff[BUFFSIZE];
88

    
89
    len = recv_from_peer(s, &remote, buff, BUFFSIZE);
90
    if (len < 0) {
91
      fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
92
      nodeid_free(remote);
93
      continue;
94
    }
95
    dprintf("Received message (%c) from %s\n", buff[0], node_addr(remote));
96
    switch (buff[0] /* Message Type */) {
97
      case MSG_TYPE_TOPOLOGY:
98
        pthread_mutex_lock(&topology_mutex);
99
        update_peers(remote, buff, len);
100
        pthread_mutex_unlock(&topology_mutex);
101
        break;
102
      case MSG_TYPE_CHUNK:
103
        dprintf("Chunk message received:\n");
104
        pthread_mutex_lock(&cb_mutex);
105
        received_chunk(remote, buff, len);
106
        pthread_mutex_unlock(&cb_mutex);
107
        break;
108
      case MSG_TYPE_SIGNALLING:
109
        pthread_mutex_lock(&topology_mutex);
110
        sigParseData(remote, buff, len);
111
        pthread_mutex_unlock(&topology_mutex);
112
        break;
113
      default:
114
        fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
115
    }
116
    nodeid_free(remote);
117
  }
118

    
119
  return NULL;
120
}
121

    
122
static void *topology_sending(void *dummy)
123
{
124
  int gossiping_period = period * 10;
125

    
126
  pthread_mutex_lock(&topology_mutex);
127
  update_peers(NULL, NULL, 0);
128
  pthread_mutex_unlock(&topology_mutex);
129
  while(!done) {
130
    pthread_mutex_lock(&topology_mutex);
131
    update_peers(NULL, NULL, 0);
132
    pthread_mutex_unlock(&topology_mutex);
133
    usleep(gossiping_period);
134
  }
135

    
136
  return NULL;
137
}
138

    
139
static void *chunk_sending(void *dummy)
140
{
141
  int chunk_period = period / chunks_per_period;
142

    
143
  while(!done) {
144
    pthread_mutex_lock(&topology_mutex);
145
    pthread_mutex_lock(&cb_mutex);
146
    send_chunk();
147
    pthread_mutex_unlock(&cb_mutex);
148
    pthread_mutex_unlock(&topology_mutex);
149
    usleep(chunk_period);
150
  }
151

    
152
  return NULL;
153
}
154

    
155
void loop(struct nodeID *s1, int csize, int buff_size)
156
{
157
  pthread_t receive_thread, gossiping_thread, distributing_thread;
158
  
159
  period = csize;
160
  s = s1;
161
 
162
  sigInit(s);
163
  chunkSignalingInit(s);
164
  stream_init(buff_size, s);
165
  pthread_mutex_init(&cb_mutex, NULL);
166
  pthread_mutex_init(&topology_mutex, NULL);
167
  pthread_create(&receive_thread, NULL, receive, NULL); 
168
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
169
  pthread_create(&distributing_thread, NULL, chunk_sending, NULL); 
170

    
171
  pthread_join(receive_thread, NULL);
172
  pthread_join(gossiping_thread, NULL);
173
  pthread_join(distributing_thread, NULL);
174
}
175

    
176
void source_loop(const char *fname, struct nodeID *s1, int csize, int chunks, bool loop)
177
{
178
  pthread_t generate_thread, receive_thread, gossiping_thread, distributing_thread;
179
  
180
  period = csize;
181
  chunks_per_period = chunks;
182
  s = s1;
183
 
184
  sigInit(s);
185
  chunkSignalingInit(s);
186
  if (source_init(fname, s, loop) < 0) {
187
    fprintf(stderr,"Cannot initialize source, exiting");
188
    return;
189
  }
190
  pthread_mutex_init(&cb_mutex, NULL);
191
  pthread_mutex_init(&topology_mutex, NULL);
192
  pthread_create(&receive_thread, NULL, source_receive, NULL); 
193
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
194
  pthread_create(&generate_thread, NULL, chunk_forging, NULL); 
195
  pthread_create(&distributing_thread, NULL, chunk_sending, NULL); 
196

    
197
  pthread_join(generate_thread, NULL);
198
  pthread_join(receive_thread, NULL);
199
  pthread_join(gossiping_thread, NULL);
200
  pthread_join(distributing_thread, NULL);
201
}