Statistics
| Branch: | Revision:

streamers / loop-mt.c @ 7e3653fa

History | View | Annotate | Download (5.09 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/time.h>
8
#include <unistd.h>
9
#include <stdint.h>
10
#include <stdlib.h>
11
#include <stdio.h>
12
#include <pthread.h>
13
#include <net_helper.h>
14
#include <msg_types.h>
15
#include <peerset.h>
16
#include <peer.h>
17

    
18
#include "dbg.h"
19
#include "chunk_signaling.h"
20
#include "streaming.h"
21
#include "topology.h"
22
#include "loop.h"
23

    
24
#define BUFFSIZE 512 * 1024
25
static int chunks_per_period = 1;
26
static int period = 500000;
27
static int done;
28
static pthread_mutex_t cb_mutex;
29
static pthread_mutex_t topology_mutex;
30
static struct nodeID *s;
31
static struct peerset *pset;
32

    
33
static void *chunk_forging(void *dummy)
34
{
35
  suseconds_t d;
36

    
37
  while(!done) {
38
    pthread_mutex_lock(&cb_mutex);
39
    generated_chunk(&d);
40
    pthread_mutex_unlock(&cb_mutex);
41
    usleep(d);
42
  }
43

    
44
  return NULL;
45
}
46

    
47
static void *source_receive(void *dummy)
48
{
49
  while (!done) {
50
    int len;
51
    struct nodeID *remote;
52
  static uint8_t buff[BUFFSIZE];
53

    
54
    len = recv_from_peer(s, &remote, buff, BUFFSIZE);
55
    if (len < 0) {
56
      fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
57
      nodeid_free(remote);
58
      continue;
59
    }
60
    switch (buff[0] /* Message Type */) {
61
      case MSG_TYPE_TOPOLOGY:
62
        pthread_mutex_lock(&topology_mutex);
63
        update_peers(pset, buff, len);
64
        pthread_mutex_unlock(&topology_mutex);
65
        break;
66
      case MSG_TYPE_CHUNK:
67
        fprintf(stderr, "Some dumb peer pushed a chunk to me!\n");
68
        break;
69
      case MSG_TYPE_SIGNALLING:
70
        pthread_mutex_lock(&topology_mutex);
71
        sigParseData(remote, buff, len);
72
        pthread_mutex_unlock(&topology_mutex);
73
        break;
74
      default:
75
        fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
76
    }
77
    nodeid_free(remote);
78
  }
79

    
80
  return NULL;
81
}
82

    
83
static void *receive(void *dummy)
84
{
85
  while (!done) {
86
    int len;
87
    struct nodeID *remote;
88
  static uint8_t buff[BUFFSIZE];
89

    
90
    len = recv_from_peer(s, &remote, buff, BUFFSIZE);
91
    if (len < 0) {
92
      fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
93
      nodeid_free(remote);
94
      continue;
95
    }
96
    dprintf("Received message (%c) from %s\n", buff[0], node_addr(remote));
97
    switch (buff[0] /* Message Type */) {
98
      case MSG_TYPE_TOPOLOGY:
99
        pthread_mutex_lock(&topology_mutex);
100
        update_peers(pset, buff, len);
101
        pthread_mutex_unlock(&topology_mutex);
102
        break;
103
      case MSG_TYPE_CHUNK:
104
        dprintf("Chunk message received:\n");
105
        pthread_mutex_lock(&cb_mutex);
106
        received_chunk(pset, remote, buff, len);
107
        pthread_mutex_unlock(&cb_mutex);
108
        break;
109
      case MSG_TYPE_SIGNALLING:
110
        pthread_mutex_lock(&topology_mutex);
111
        sigParseData(remote, buff, len);
112
        pthread_mutex_unlock(&topology_mutex);
113
        break;
114
      default:
115
        fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
116
    }
117
    nodeid_free(remote);
118
  }
119

    
120
  return NULL;
121
}
122

    
123
static void *topology_sending(void *dummy)
124
{
125
  int gossiping_period = period * 10;
126

    
127
  pthread_mutex_lock(&topology_mutex);
128
  update_peers(pset, NULL, 0);
129
  pthread_mutex_unlock(&topology_mutex);
130
  while(!done) {
131
    pthread_mutex_lock(&topology_mutex);
132
    update_peers(pset, NULL, 0);
133
    pthread_mutex_unlock(&topology_mutex);
134
    usleep(gossiping_period);
135
  }
136

    
137
  return NULL;
138
}
139

    
140
static void *chunk_sending(void *dummy)
141
{
142
  int chunk_period = period / chunks_per_period;
143

    
144
  while(!done) {
145
    pthread_mutex_lock(&topology_mutex);
146
    pthread_mutex_lock(&cb_mutex);
147
    send_chunk(pset);
148
    pthread_mutex_unlock(&cb_mutex);
149
    pthread_mutex_unlock(&topology_mutex);
150
    usleep(chunk_period);
151
  }
152

    
153
  return NULL;
154
}
155

    
156
void loop(struct nodeID *s1, int csize, int buff_size)
157
{
158
  pthread_t receive_thread, gossiping_thread, distributing_thread;
159
  
160
  period = csize;
161
  s = s1;
162
 
163
  pset = peerset_init(0);
164
  sigInit(s,pset);
165
  stream_init(buff_size, s);
166
  pthread_mutex_init(&cb_mutex, NULL);
167
  pthread_mutex_init(&topology_mutex, NULL);
168
  pthread_create(&receive_thread, NULL, receive, NULL); 
169
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
170
  pthread_create(&distributing_thread, NULL, chunk_sending, NULL); 
171

    
172
  pthread_join(receive_thread, NULL);
173
  pthread_join(gossiping_thread, NULL);
174
  pthread_join(distributing_thread, NULL);
175
}
176

    
177
void source_loop(const char *fname, struct nodeID *s1, int csize, int chunks, bool loop)
178
{
179
  pthread_t generate_thread, receive_thread, gossiping_thread, distributing_thread;
180
  
181
  period = csize;
182
  chunks_per_period = chunks;
183
  s = s1;
184
 
185
  pset = peerset_init(0);
186
  sigInit(s,pset);
187
  source_init(fname, s, loop);
188
  pthread_mutex_init(&cb_mutex, NULL);
189
  pthread_mutex_init(&topology_mutex, NULL);
190
  pthread_create(&receive_thread, NULL, source_receive, NULL); 
191
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
192
  pthread_create(&generate_thread, NULL, chunk_forging, NULL); 
193
  pthread_create(&distributing_thread, NULL, chunk_sending, NULL); 
194

    
195
  pthread_join(generate_thread, NULL);
196
  pthread_join(receive_thread, NULL);
197
  pthread_join(gossiping_thread, NULL);
198
  pthread_join(distributing_thread, NULL);
199
}