Statistics
| Branch: | Revision:

streamers / loop-mt.c @ fbeb4f32

History | View | Annotate | Download (4.36 KB)

1
#include <sys/time.h>
2
#include <unistd.h>
3
#include <stdint.h>
4
#include <stdlib.h>
5
#include <stdio.h>
6
#include <pthread.h>
7
#include <net_helper.h>
8
#include <msg_types.h>
9
#include <peerset.h>
10
#include <peer.h>
11

    
12
#include "chunk_signaling.h"
13
#include "streaming.h"
14
#include "topology.h"
15
#include "loop.h"
16

    
17
#define BUFFSIZE 64 * 1024
18
static int chunks_per_period = 1;
19
static int period = 500000;
20
static int done;
21
static pthread_mutex_t cb_mutex;
22
static pthread_mutex_t topology_mutex;
23
static struct nodeID *s;
24
static struct peerset *pset;
25

    
26
static void *chunk_forging(void *dummy)
27
{
28
  suseconds_t d;
29

    
30
  while(!done) {
31
    pthread_mutex_lock(&cb_mutex);
32
    generated_chunk(&d);
33
    pthread_mutex_unlock(&cb_mutex);
34
    usleep(d);
35
  }
36

    
37
  return NULL;
38
}
39

    
40
static void *source_receive(void *dummy)
41
{
42
  while (!done) {
43
    int len;
44
    struct nodeID *remote;
45
  static uint8_t buff[BUFFSIZE];
46

    
47
    len = recv_from_peer(s, &remote, buff, BUFFSIZE);
48
    switch (buff[0] /* Message Type */) {
49
      case MSG_TYPE_TOPOLOGY:
50
        pthread_mutex_lock(&topology_mutex);
51
        update_peers(pset, remote, buff, len);
52
        pthread_mutex_unlock(&topology_mutex);
53
        break;
54
      case MSG_TYPE_CHUNK:
55
        fprintf(stderr, "Some dumb peer pushed a chunk to me!\n");
56
        break;
57
      default:
58
        fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
59
    }
60
    nodeID_free(remote);
61
  }
62

    
63
  return NULL;
64
}
65

    
66
static void *receive(void *dummy)
67
{
68
  while (!done) {
69
    int len;
70
    struct nodeID *remote;
71
  static uint8_t buff[BUFFSIZE];
72

    
73
    len = recv_from_peer(s, &remote, buff, BUFFSIZE);
74
    switch (buff[0] /* Message Type */) {
75
      case MSG_TYPE_TOPOLOGY:
76
        pthread_mutex_lock(&topology_mutex);
77
        update_peers(pset, remote, buff, len);
78
        pthread_mutex_unlock(&topology_mutex);
79
        break;
80
      case MSG_TYPE_CHUNK:
81
        pthread_mutex_lock(&cb_mutex);
82
        received_chunk(pset, remote, buff, len);
83
        pthread_mutex_unlock(&cb_mutex);
84
        break;
85
      case MSG_TYPE_SIGNALLING:
86
        pthread_mutex_lock(&topology_mutex);
87
        sigParseData(remote, buff, len);
88
        pthread_mutex_unlock(&topology_mutex);
89
        break;
90
      default:
91
        fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
92
    }
93
    nodeID_free(remote);
94
  }
95

    
96
  return NULL;
97
}
98

    
99
static void *topology_sending(void *dummy)
100
{
101
  int gossiping_period = period * 10;
102

    
103
  pthread_mutex_lock(&topology_mutex);
104
  update_peers(pset, NULL, NULL, 0);
105
  pthread_mutex_unlock(&topology_mutex);
106
  while(!done) {
107
    pthread_mutex_lock(&topology_mutex);
108
    update_peers(pset, NULL, NULL, 0);
109
    pthread_mutex_unlock(&topology_mutex);
110
    usleep(gossiping_period);
111
  }
112

    
113
  return NULL;
114
}
115

    
116
static void *chunk_sending(void *dummy)
117
{
118
  int chunk_period = period / chunks_per_period;
119

    
120
  while(!done) {
121
    pthread_mutex_lock(&topology_mutex);
122
    pthread_mutex_lock(&cb_mutex);
123
    send_chunk(pset);
124
    pthread_mutex_unlock(&cb_mutex);
125
    pthread_mutex_unlock(&topology_mutex);
126
    usleep(chunk_period);
127
  }
128

    
129
  return NULL;
130
}
131

    
132
void loop(struct nodeID *s1, int csize, int buff_size)
133
{
134
  pthread_t receive_thread, gossiping_thread, distributing_thread;
135
  
136
  period = csize;
137
  s = s1;
138
 
139
  pset = peerset_init(0);
140
  sigInit(s,pset);
141
  stream_init(buff_size, s);
142
  pthread_mutex_init(&cb_mutex, NULL);
143
  pthread_mutex_init(&topology_mutex, NULL);
144
  pthread_create(&receive_thread, NULL, receive, NULL); 
145
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
146
  pthread_create(&distributing_thread, NULL, chunk_sending, NULL); 
147

    
148
  pthread_join(receive_thread, NULL);
149
  pthread_join(gossiping_thread, NULL);
150
  pthread_join(distributing_thread, NULL);
151
}
152

    
153
void source_loop(const char *fname, struct nodeID *s1, int csize, int chunks)
154
{
155
  pthread_t generate_thread, receive_thread, gossiping_thread, distributing_thread;
156
  
157
  period = csize;
158
  chunks_per_period = chunks;
159
  s = s1;
160
 
161
  pset = peerset_init(0);
162
  sigInit(s,pset);
163
  source_init(fname, s);
164
  pthread_mutex_init(&cb_mutex, NULL);
165
  pthread_mutex_init(&topology_mutex, NULL);
166
  pthread_create(&receive_thread, NULL, source_receive, NULL); 
167
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
168
  pthread_create(&generate_thread, NULL, chunk_forging, NULL); 
169
  pthread_create(&distributing_thread, NULL, chunk_sending, NULL); 
170

    
171
  pthread_join(generate_thread, NULL);
172
  pthread_join(receive_thread, NULL);
173
  pthread_join(gossiping_thread, NULL);
174
  pthread_join(distributing_thread, NULL);
175
}