Statistics
| Branch: | Revision:

streamers / loop-mt.c @ 4136911a

History | View | Annotate | Download (4.02 KB)

1
#include <unistd.h>
2
#include <stdint.h>
3
#include <stdlib.h>
4
#include <stdio.h>
5
#include <pthread.h>
6

    
7
#include <net_helper.h>
8
#include <topmanager.h>
9
#include <msg_types.h>
10

    
11
#include "streaming.h"
12
#include "loop.h"
13

    
14
#define BUFFSIZE 64 * 1024
15
static int chunks_per_period = 1;
16
static int period = 500000;
17
static int done;
18
static pthread_mutex_t cb_mutex;
19
static pthread_mutex_t topology_mutex;
20
static struct nodeID *s;
21

    
22
static void *chunk_forging(void *dummy)
23
{
24
  int chunk_period = period;
25

    
26
  while(!done) {
27
    pthread_mutex_lock(&cb_mutex);
28
    generated_chunk();
29
    pthread_mutex_unlock(&cb_mutex);
30
    usleep(chunk_period);
31
  }
32

    
33
  return NULL;
34
}
35

    
36
static void *source_receive(void *dummy)
37
{
38
  while (!done) {
39
    int len;
40
    struct nodeID *remote;
41
  static uint8_t buff[BUFFSIZE];
42

    
43
    len = recv_from_peer(s, &remote, buff, BUFFSIZE);
44
    switch (buff[0] /* Message Type */) {
45
      case MSG_TYPE_TOPOLOGY:
46
        pthread_mutex_lock(&topology_mutex);
47
        topParseData(buff, len);
48
        pthread_mutex_unlock(&topology_mutex);
49
        break;
50
      case MSG_TYPE_CHUNK:
51
        fprintf(stderr, "Some dumb peer pushed a chunk to me!\n");
52
        break;
53
      default:
54
        fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
55
    }
56
    free(remote);
57
  }
58

    
59
  return NULL;
60
}
61

    
62
static void *receive(void *dummy)
63
{
64
  while (!done) {
65
    int len;
66
    struct nodeID *remote;
67
  static uint8_t buff[BUFFSIZE];
68

    
69
    len = recv_from_peer(s, &remote, buff, BUFFSIZE);
70
    switch (buff[0] /* Message Type */) {
71
      case MSG_TYPE_TOPOLOGY:
72
        pthread_mutex_lock(&topology_mutex);
73
        topParseData(buff, len);
74
        pthread_mutex_unlock(&topology_mutex);
75
        break;
76
      case MSG_TYPE_CHUNK:
77
        pthread_mutex_lock(&cb_mutex);
78
        received_chunk(buff, len);
79
        pthread_mutex_unlock(&cb_mutex);
80
        break;
81
      default:
82
        fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
83
    }
84
    free(remote);
85
  }
86

    
87
  return NULL;
88
}
89

    
90
static void *topology_sending(void *dummy)
91
{
92
  int gossiping_period = period * 10;
93

    
94
  pthread_mutex_lock(&topology_mutex);
95
  topParseData(NULL, 0);
96
  pthread_mutex_unlock(&topology_mutex);
97
  while(!done) {
98
    pthread_mutex_lock(&topology_mutex);
99
    topParseData(NULL, 0);
100
    pthread_mutex_unlock(&topology_mutex);
101
    usleep(gossiping_period);
102
  }
103

    
104
  return NULL;
105
}
106

    
107
static void *chunk_sending(void *dummy)
108
{
109
  int chunk_period = period / chunks_per_period;
110

    
111
  while(!done) {
112
    const struct nodeID **neighbours;
113
    int n;
114

    
115
    pthread_mutex_lock(&topology_mutex);
116
    neighbours = topGetNeighbourhood(&n);
117
    pthread_mutex_lock(&cb_mutex);
118
    send_chunk(neighbours, n);
119
    pthread_mutex_unlock(&cb_mutex);
120
    pthread_mutex_unlock(&topology_mutex);
121
    usleep(chunk_period);
122
  }
123

    
124
  return NULL;
125
}
126

    
127
void loop(struct nodeID *s1, int csize, int buff_size)
128
{
129
  pthread_t receive_thread, gossiping_thread, distributing_thread;
130
  
131
  period = csize;
132
  s = s1;
133
 
134
  stream_init(buff_size, s);
135
  pthread_mutex_init(&cb_mutex, NULL);
136
  pthread_mutex_init(&topology_mutex, NULL);
137
  pthread_create(&receive_thread, NULL, receive, NULL); 
138
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
139
  pthread_create(&distributing_thread, NULL, chunk_sending, NULL); 
140

    
141
  pthread_join(receive_thread, NULL);
142
  pthread_join(gossiping_thread, NULL);
143
  pthread_join(distributing_thread, NULL);
144
}
145

    
146
void source_loop(const char *fname, struct nodeID *s1, int csize, int chunks)
147
{
148
  pthread_t generate_thread, receive_thread, gossiping_thread, distributing_thread;
149
  
150
  period = csize;
151
  chunks_per_period = chunks;
152
  s = s1;
153
 
154
  source_init(fname, s);
155
  pthread_mutex_init(&cb_mutex, NULL);
156
  pthread_mutex_init(&topology_mutex, NULL);
157
  pthread_create(&receive_thread, NULL, source_receive, NULL); 
158
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
159
  pthread_create(&generate_thread, NULL, chunk_forging, NULL); 
160
  pthread_create(&distributing_thread, NULL, chunk_sending, NULL); 
161

    
162
  pthread_join(generate_thread, NULL);
163
  pthread_join(receive_thread, NULL);
164
  pthread_join(gossiping_thread, NULL);
165
  pthread_join(distributing_thread, NULL);
166
}