Statistics
| Branch: | Revision:

streamers / loop-mt.c @ 6920fdab

History | View | Annotate | Download (3.85 KB)

1 d92a24a6 Luca Abeni
#include <unistd.h>
2
#include <stdint.h>
3
#include <stdlib.h>
4
#include <stdio.h>
5
#include <pthread.h>
6
7
#include <net_helper.h>
8
#include <topmanager.h>
9
10
#include "streaming.h"
11
#include "loop.h"
12
13
static int chunks_per_period = 1;
14
static int period = 500000;
15
static int done;
16
static pthread_mutex_t cb_mutex;
17
static pthread_mutex_t topology_mutex;
18
static struct nodeID *s;
19
20
static void *chunk_forging(void *dummy)
21
{
22 83c5c285 Luca Abeni
  int chunk_period = period;
23 d92a24a6 Luca Abeni
24
  while(!done) {
25
    pthread_mutex_lock(&cb_mutex);
26
    generated_chunk();
27
    pthread_mutex_unlock(&cb_mutex);
28 83c5c285 Luca Abeni
    usleep(chunk_period);
29 d92a24a6 Luca Abeni
  }
30
31
  return NULL;
32
}
33
34
static void *source_receive(void *dummy)
35
{
36
  while (!done) {
37
    int len;
38
    struct nodeID *remote;
39
#define BUFFSIZE 1024
40
  static uint8_t buff[BUFFSIZE];
41
42
    len = recv_data(s, &remote, buff, BUFFSIZE);
43
    switch (buff[0] /* Message Type */) {
44
      case 0x10 /* NCAST_PROTO */:
45
        pthread_mutex_lock(&topology_mutex);
46
        topParseData(buff, len);
47
        pthread_mutex_unlock(&topology_mutex);
48
        break;
49
      default:
50
        fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
51
    }
52
    free(remote);
53
  }
54
55
  return NULL;
56
}
57
58
static void *receive(void *dummy)
59
{
60
  while (!done) {
61
    int len;
62
    struct nodeID *remote;
63
#define BUFFSIZE 1024
64
  static uint8_t buff[BUFFSIZE];
65
66
    len = recv_data(s, &remote, buff, BUFFSIZE);
67
    switch (buff[0] /* Message Type */) {
68
      case 0x10 /* NCAST_PROTO */:
69
        pthread_mutex_lock(&topology_mutex);
70
        topParseData(buff, len);
71
        pthread_mutex_unlock(&topology_mutex);
72
        break;
73
      case 12:
74
        pthread_mutex_lock(&cb_mutex);
75
        received_chunk(buff, len);
76
        pthread_mutex_unlock(&cb_mutex);
77
      default:
78
        fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
79
    }
80
    free(remote);
81
  }
82
83
  return NULL;
84
}
85
86
static void *topology_sending(void *dummy)
87
{
88
  int gossiping_period = period * 10;
89
90
  pthread_mutex_lock(&topology_mutex);
91
  topParseData(NULL, 0);
92
  pthread_mutex_unlock(&topology_mutex);
93
  while(!done) {
94
    pthread_mutex_lock(&topology_mutex);
95
    topParseData(NULL, 0);
96
    pthread_mutex_unlock(&topology_mutex);
97 83c5c285 Luca Abeni
    usleep(gossiping_period);
98 d92a24a6 Luca Abeni
  }
99
100
  return NULL;
101
}
102
103
static void *chunk_sending(void *dummy)
104
{
105 83c5c285 Luca Abeni
  int chunk_period = period / chunks_per_period;
106 d92a24a6 Luca Abeni
107
  while(!done) {
108
    const struct nodeID **neighbours;
109
    int n;
110
111
    pthread_mutex_lock(&topology_mutex);
112
    neighbours = topGetNeighbourhood(&n);
113
    pthread_mutex_lock(&cb_mutex);
114
    send_chunk(neighbours, n);
115
    pthread_mutex_unlock(&cb_mutex);
116
    pthread_mutex_unlock(&topology_mutex);
117 83c5c285 Luca Abeni
    usleep(chunk_period);
118 d92a24a6 Luca Abeni
  }
119
120
  return NULL;
121
}
122
123
void loop(struct nodeID *s1, int csize)
124
{
125
  pthread_t receive_thread, gossiping_thread, distributing_thread;
126
  
127
  period = csize;
128
  s = s1;
129
 
130 6920fdab Luca
  stream_init(8, s);        // FIXME!
131 d92a24a6 Luca Abeni
  pthread_mutex_init(&cb_mutex, NULL);
132
  pthread_mutex_init(&topology_mutex, NULL);
133
  pthread_create(&receive_thread, NULL, receive, NULL); 
134
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
135
  pthread_create(&distributing_thread, NULL, chunk_sending, NULL); 
136
137
  pthread_join(receive_thread, NULL);
138
  pthread_join(gossiping_thread, NULL);
139
  pthread_join(distributing_thread, NULL);
140
}
141
142
void source_loop(struct nodeID *s1, int csize, int chunks)
143
{
144
  pthread_t generate_thread, receive_thread, gossiping_thread, distributing_thread;
145
  
146
  period = csize;
147
  chunks_per_period = chunks;
148
  s = s1;
149
 
150 6920fdab Luca
  stream_init(1, s);
151 d92a24a6 Luca Abeni
  pthread_mutex_init(&cb_mutex, NULL);
152
  pthread_mutex_init(&topology_mutex, NULL);
153
  pthread_create(&receive_thread, NULL, source_receive, NULL); 
154
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
155
  pthread_create(&generate_thread, NULL, chunk_forging, NULL); 
156
  pthread_create(&distributing_thread, NULL, chunk_sending, NULL); 
157
158
  pthread_join(generate_thread, NULL);
159
  pthread_join(receive_thread, NULL);
160
  pthread_join(gossiping_thread, NULL);
161
  pthread_join(distributing_thread, NULL);
162
}