Statistics
| Branch: | Revision:

streamers / loop-mt.c @ 83c5c285

History | View | Annotate | Download (3.85 KB)

1
#include <unistd.h>
2
#include <stdint.h>
3
#include <stdlib.h>
4
#include <stdio.h>
5
#include <pthread.h>
6

    
7
#include <net_helper.h>
8
#include <topmanager.h>
9

    
10
#include "streaming.h"
11
#include "loop.h"
12

    
13
static int chunks_per_period = 1;
14
static int period = 500000;
15
static int done;
16
static pthread_mutex_t cb_mutex;
17
static pthread_mutex_t topology_mutex;
18
static struct nodeID *s;
19

    
20
static void *chunk_forging(void *dummy)
21
{
22
  int chunk_period = period;
23

    
24
  while(!done) {
25
    pthread_mutex_lock(&cb_mutex);
26
    generated_chunk();
27
    pthread_mutex_unlock(&cb_mutex);
28
    usleep(chunk_period);
29
  }
30

    
31
  return NULL;
32
}
33

    
34
static void *source_receive(void *dummy)
35
{
36
  while (!done) {
37
    int len;
38
    struct nodeID *remote;
39
#define BUFFSIZE 1024
40
  static uint8_t buff[BUFFSIZE];
41

    
42
    len = recv_data(s, &remote, buff, BUFFSIZE);
43
    switch (buff[0] /* Message Type */) {
44
      case 0x10 /* NCAST_PROTO */:
45
        pthread_mutex_lock(&topology_mutex);
46
        topParseData(buff, len);
47
        pthread_mutex_unlock(&topology_mutex);
48
        break;
49
      default:
50
        fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
51
    }
52
    free(remote);
53
  }
54

    
55
  return NULL;
56
}
57

    
58
static void *receive(void *dummy)
59
{
60
  while (!done) {
61
    int len;
62
    struct nodeID *remote;
63
#define BUFFSIZE 1024
64
  static uint8_t buff[BUFFSIZE];
65

    
66
    len = recv_data(s, &remote, buff, BUFFSIZE);
67
    switch (buff[0] /* Message Type */) {
68
      case 0x10 /* NCAST_PROTO */:
69
        pthread_mutex_lock(&topology_mutex);
70
        topParseData(buff, len);
71
        pthread_mutex_unlock(&topology_mutex);
72
        break;
73
      case 12:
74
        pthread_mutex_lock(&cb_mutex);
75
        received_chunk(buff, len);
76
        pthread_mutex_unlock(&cb_mutex);
77
      default:
78
        fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
79
    }
80
    free(remote);
81
  }
82

    
83
  return NULL;
84
}
85

    
86
static void *topology_sending(void *dummy)
87
{
88
  int gossiping_period = period * 10;
89

    
90
  pthread_mutex_lock(&topology_mutex);
91
  topParseData(NULL, 0);
92
  pthread_mutex_unlock(&topology_mutex);
93
  while(!done) {
94
    pthread_mutex_lock(&topology_mutex);
95
    topParseData(NULL, 0);
96
    pthread_mutex_unlock(&topology_mutex);
97
    usleep(gossiping_period);
98
  }
99

    
100
  return NULL;
101
}
102

    
103
static void *chunk_sending(void *dummy)
104
{
105
  int chunk_period = period / chunks_per_period;
106

    
107
  while(!done) {
108
    const struct nodeID **neighbours;
109
    int n;
110

    
111
    pthread_mutex_lock(&topology_mutex);
112
    neighbours = topGetNeighbourhood(&n);
113
    pthread_mutex_lock(&cb_mutex);
114
    send_chunk(neighbours, n);
115
    pthread_mutex_unlock(&cb_mutex);
116
    pthread_mutex_unlock(&topology_mutex);
117
    usleep(chunk_period);
118
  }
119

    
120
  return NULL;
121
}
122

    
123
void loop(struct nodeID *s1, int csize)
124
{
125
  pthread_t receive_thread, gossiping_thread, distributing_thread;
126
  
127
  period = csize;
128
  s = s1;
129
 
130
  stream_init(8);        // FIXME!
131
  pthread_mutex_init(&cb_mutex, NULL);
132
  pthread_mutex_init(&topology_mutex, NULL);
133
  pthread_create(&receive_thread, NULL, receive, NULL); 
134
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
135
  pthread_create(&distributing_thread, NULL, chunk_sending, NULL); 
136

    
137
  pthread_join(receive_thread, NULL);
138
  pthread_join(gossiping_thread, NULL);
139
  pthread_join(distributing_thread, NULL);
140
}
141

    
142
void source_loop(struct nodeID *s1, int csize, int chunks)
143
{
144
  pthread_t generate_thread, receive_thread, gossiping_thread, distributing_thread;
145
  
146
  period = csize;
147
  chunks_per_period = chunks;
148
  s = s1;
149
 
150
  stream_init(1);        // FIXME!
151
  pthread_mutex_init(&cb_mutex, NULL);
152
  pthread_mutex_init(&topology_mutex, NULL);
153
  pthread_create(&receive_thread, NULL, source_receive, NULL); 
154
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
155
  pthread_create(&generate_thread, NULL, chunk_forging, NULL); 
156
  pthread_create(&distributing_thread, NULL, chunk_sending, NULL); 
157

    
158
  pthread_join(generate_thread, NULL);
159
  pthread_join(receive_thread, NULL);
160
  pthread_join(gossiping_thread, NULL);
161
  pthread_join(distributing_thread, NULL);
162
}