Statistics
| Branch: | Revision:

streamers / loop-mt.c @ a9d7dd3b

History | View | Annotate | Download (4.04 KB)

1
#include <sys/time.h>
2
#include <unistd.h>
3
#include <stdint.h>
4
#include <stdlib.h>
5
#include <stdio.h>
6
#include <pthread.h>
7

    
8
#include <net_helper.h>
9
#include <topmanager.h>
10
#include <msg_types.h>
11

    
12
#include "streaming.h"
13
#include "loop.h"
14

    
15
#define BUFFSIZE 64 * 1024
16
static int chunks_per_period = 1;
17
static int period = 500000;
18
static int done;
19
static pthread_mutex_t cb_mutex;
20
static pthread_mutex_t topology_mutex;
21
static struct nodeID *s;
22

    
23
static void *chunk_forging(void *dummy)
24
{
25
  suseconds_t d;
26

    
27
  while(!done) {
28
    pthread_mutex_lock(&cb_mutex);
29
    generated_chunk(&d);
30
    pthread_mutex_unlock(&cb_mutex);
31
    usleep(d);
32
  }
33

    
34
  return NULL;
35
}
36

    
37
static void *source_receive(void *dummy)
38
{
39
  while (!done) {
40
    int len;
41
    struct nodeID *remote;
42
  static uint8_t buff[BUFFSIZE];
43

    
44
    len = recv_from_peer(s, &remote, buff, BUFFSIZE);
45
    switch (buff[0] /* Message Type */) {
46
      case MSG_TYPE_TOPOLOGY:
47
        pthread_mutex_lock(&topology_mutex);
48
        topParseData(buff, len);
49
        pthread_mutex_unlock(&topology_mutex);
50
        break;
51
      case MSG_TYPE_CHUNK:
52
        fprintf(stderr, "Some dumb peer pushed a chunk to me!\n");
53
        break;
54
      default:
55
        fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
56
    }
57
    nodeid_free(remote);
58
  }
59

    
60
  return NULL;
61
}
62

    
63
static void *receive(void *dummy)
64
{
65
  while (!done) {
66
    int len;
67
    struct nodeID *remote;
68
  static uint8_t buff[BUFFSIZE];
69

    
70
    len = recv_from_peer(s, &remote, buff, BUFFSIZE);
71
    switch (buff[0] /* Message Type */) {
72
      case MSG_TYPE_TOPOLOGY:
73
        pthread_mutex_lock(&topology_mutex);
74
        topParseData(buff, len);
75
        pthread_mutex_unlock(&topology_mutex);
76
        break;
77
      case MSG_TYPE_CHUNK:
78
        pthread_mutex_lock(&cb_mutex);
79
        received_chunk(buff, len);
80
        pthread_mutex_unlock(&cb_mutex);
81
        break;
82
      default:
83
        fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
84
    }
85
    nodeid_free(remote);
86
  }
87

    
88
  return NULL;
89
}
90

    
91
static void *topology_sending(void *dummy)
92
{
93
  int gossiping_period = period * 10;
94

    
95
  pthread_mutex_lock(&topology_mutex);
96
  topParseData(NULL, 0);
97
  pthread_mutex_unlock(&topology_mutex);
98
  while(!done) {
99
    pthread_mutex_lock(&topology_mutex);
100
    topParseData(NULL, 0);
101
    pthread_mutex_unlock(&topology_mutex);
102
    usleep(gossiping_period);
103
  }
104

    
105
  return NULL;
106
}
107

    
108
static void *chunk_sending(void *dummy)
109
{
110
  int chunk_period = period / chunks_per_period;
111

    
112
  while(!done) {
113
    const struct nodeID **neighbours;
114
    int n;
115

    
116
    pthread_mutex_lock(&topology_mutex);
117
    neighbours = topGetNeighbourhood(&n);
118
    pthread_mutex_lock(&cb_mutex);
119
    send_chunk(neighbours, n);
120
    pthread_mutex_unlock(&cb_mutex);
121
    pthread_mutex_unlock(&topology_mutex);
122
    usleep(chunk_period);
123
  }
124

    
125
  return NULL;
126
}
127

    
128
void loop(struct nodeID *s1, int csize, int buff_size)
129
{
130
  pthread_t receive_thread, gossiping_thread, distributing_thread;
131
  
132
  period = csize;
133
  s = s1;
134
 
135
  stream_init(buff_size, s);
136
  pthread_mutex_init(&cb_mutex, NULL);
137
  pthread_mutex_init(&topology_mutex, NULL);
138
  pthread_create(&receive_thread, NULL, receive, NULL); 
139
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
140
  pthread_create(&distributing_thread, NULL, chunk_sending, NULL); 
141

    
142
  pthread_join(receive_thread, NULL);
143
  pthread_join(gossiping_thread, NULL);
144
  pthread_join(distributing_thread, NULL);
145
}
146

    
147
void source_loop(const char *fname, struct nodeID *s1, int csize, int chunks)
148
{
149
  pthread_t generate_thread, receive_thread, gossiping_thread, distributing_thread;
150
  
151
  period = csize;
152
  chunks_per_period = chunks;
153
  s = s1;
154
 
155
  source_init(fname, s);
156
  pthread_mutex_init(&cb_mutex, NULL);
157
  pthread_mutex_init(&topology_mutex, NULL);
158
  pthread_create(&receive_thread, NULL, source_receive, NULL); 
159
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
160
  pthread_create(&generate_thread, NULL, chunk_forging, NULL); 
161
  pthread_create(&distributing_thread, NULL, chunk_sending, NULL); 
162

    
163
  pthread_join(generate_thread, NULL);
164
  pthread_join(receive_thread, NULL);
165
  pthread_join(gossiping_thread, NULL);
166
  pthread_join(distributing_thread, NULL);
167
}