Statistics
| Branch: | Revision:

streamers / loop-mt.c @ 0a40460a

History | View | Annotate | Download (3.88 KB)

1
#include <unistd.h>
2
#include <stdint.h>
3
#include <stdlib.h>
4
#include <stdio.h>
5
#include <pthread.h>
6

    
7
#include <net_helper.h>
8
#include <topmanager.h>
9

    
10
#include "streaming.h"
11
#include "loop.h"
12

    
13
static int chunks_per_period = 1;
14
static int period = 500000;
15
static int done;
16
static pthread_mutex_t cb_mutex;
17
static pthread_mutex_t topology_mutex;
18
static struct nodeID *s;
19

    
20
static void *chunk_forging(void *dummy)
21
{
22
  int chunk_period = period;
23

    
24
  while(!done) {
25
    pthread_mutex_lock(&cb_mutex);
26
    generated_chunk();
27
    pthread_mutex_unlock(&cb_mutex);
28
    usleep(chunk_period);
29
  }
30

    
31
  return NULL;
32
}
33

    
34
static void *source_receive(void *dummy)
35
{
36
  while (!done) {
37
    int len;
38
    struct nodeID *remote;
39
#define BUFFSIZE 1024
40
  static uint8_t buff[BUFFSIZE];
41

    
42
    len = recv_data(s, &remote, buff, BUFFSIZE);
43
    switch (buff[0] /* Message Type */) {
44
      case 0x10 /* NCAST_PROTO */:
45
        pthread_mutex_lock(&topology_mutex);
46
        topParseData(buff, len);
47
        pthread_mutex_unlock(&topology_mutex);
48
        break;
49
      default:
50
        fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
51
    }
52
    free(remote);
53
  }
54

    
55
  return NULL;
56
}
57

    
58
static void *receive(void *dummy)
59
{
60
  while (!done) {
61
    int len;
62
    struct nodeID *remote;
63
#define BUFFSIZE 1024
64
  static uint8_t buff[BUFFSIZE];
65

    
66
    len = recv_data(s, &remote, buff, BUFFSIZE);
67
    switch (buff[0] /* Message Type */) {
68
      case 0x10 /* NCAST_PROTO */:
69
        pthread_mutex_lock(&topology_mutex);
70
        topParseData(buff, len);
71
        pthread_mutex_unlock(&topology_mutex);
72
        break;
73
      case 12:
74
        pthread_mutex_lock(&cb_mutex);
75
        received_chunk(buff, len);
76
        pthread_mutex_unlock(&cb_mutex);
77
        break;
78
      default:
79
        fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
80
    }
81
    free(remote);
82
  }
83

    
84
  return NULL;
85
}
86

    
87
static void *topology_sending(void *dummy)
88
{
89
  int gossiping_period = period * 10;
90

    
91
  pthread_mutex_lock(&topology_mutex);
92
  topParseData(NULL, 0);
93
  pthread_mutex_unlock(&topology_mutex);
94
  while(!done) {
95
    pthread_mutex_lock(&topology_mutex);
96
    topParseData(NULL, 0);
97
    pthread_mutex_unlock(&topology_mutex);
98
    usleep(gossiping_period);
99
  }
100

    
101
  return NULL;
102
}
103

    
104
static void *chunk_sending(void *dummy)
105
{
106
  int chunk_period = period / chunks_per_period;
107

    
108
  while(!done) {
109
    const struct nodeID **neighbours;
110
    int n;
111

    
112
    pthread_mutex_lock(&topology_mutex);
113
    neighbours = topGetNeighbourhood(&n);
114
    pthread_mutex_lock(&cb_mutex);
115
    send_chunk(neighbours, n);
116
    pthread_mutex_unlock(&cb_mutex);
117
    pthread_mutex_unlock(&topology_mutex);
118
    usleep(chunk_period);
119
  }
120

    
121
  return NULL;
122
}
123

    
124
void loop(struct nodeID *s1, int csize, int buff_size)
125
{
126
  pthread_t receive_thread, gossiping_thread, distributing_thread;
127
  
128
  period = csize;
129
  s = s1;
130
 
131
  stream_init(buff_size, s);
132
  pthread_mutex_init(&cb_mutex, NULL);
133
  pthread_mutex_init(&topology_mutex, NULL);
134
  pthread_create(&receive_thread, NULL, receive, NULL); 
135
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
136
  pthread_create(&distributing_thread, NULL, chunk_sending, NULL); 
137

    
138
  pthread_join(receive_thread, NULL);
139
  pthread_join(gossiping_thread, NULL);
140
  pthread_join(distributing_thread, NULL);
141
}
142

    
143
void source_loop(struct nodeID *s1, int csize, int chunks)
144
{
145
  pthread_t generate_thread, receive_thread, gossiping_thread, distributing_thread;
146
  
147
  period = csize;
148
  chunks_per_period = chunks;
149
  s = s1;
150
 
151
  stream_init(1, s);
152
  pthread_mutex_init(&cb_mutex, NULL);
153
  pthread_mutex_init(&topology_mutex, NULL);
154
  pthread_create(&receive_thread, NULL, source_receive, NULL); 
155
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
156
  pthread_create(&generate_thread, NULL, chunk_forging, NULL); 
157
  pthread_create(&distributing_thread, NULL, chunk_sending, NULL); 
158

    
159
  pthread_join(generate_thread, NULL);
160
  pthread_join(receive_thread, NULL);
161
  pthread_join(gossiping_thread, NULL);
162
  pthread_join(distributing_thread, NULL);
163
}