Statistics
| Branch: | Revision:

streamers / loop-mt.c @ 9d6a53c3

History | View | Annotate | Download (4.62 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/time.h>
8
#include <unistd.h>
9
#include <stdint.h>
10
#include <stdlib.h>
11
#include <stdio.h>
12
#include <pthread.h>
13

    
14
#include <net_helper.h>
15
#include <topmanager.h>
16
#include <msg_types.h>
17

    
18
#include "dbg.h"
19
#include "streaming.h"
20
#include "loop.h"
21

    
22
#define BUFFSIZE 512 * 1024
23
static int chunks_per_period = 1;
24
static int period = 500000;
25
static int done;
26
static pthread_mutex_t cb_mutex;
27
static pthread_mutex_t topology_mutex;
28
static struct nodeID *s;
29

    
30
static void *chunk_forging(void *dummy)
31
{
32
  suseconds_t d;
33

    
34
  while(!done) {
35
    pthread_mutex_lock(&cb_mutex);
36
    generated_chunk(&d);
37
    pthread_mutex_unlock(&cb_mutex);
38
    usleep(d);
39
  }
40

    
41
  return NULL;
42
}
43

    
44
static void *source_receive(void *dummy)
45
{
46
  while (!done) {
47
    int len;
48
    struct nodeID *remote;
49
  static uint8_t buff[BUFFSIZE];
50

    
51
    len = recv_from_peer(s, &remote, buff, BUFFSIZE);
52
    if (len < 0) {
53
      fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
54
      nodeid_free(remote);
55
      continue;
56
    }
57
    switch (buff[0] /* Message Type */) {
58
      case MSG_TYPE_TOPOLOGY:
59
        pthread_mutex_lock(&topology_mutex);
60
        topParseData(buff, len);
61
        pthread_mutex_unlock(&topology_mutex);
62
        break;
63
      case MSG_TYPE_CHUNK:
64
        fprintf(stderr, "Some dumb peer pushed a chunk to me!\n");
65
        break;
66
      default:
67
        fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
68
    }
69
    nodeid_free(remote);
70
  }
71

    
72
  return NULL;
73
}
74

    
75
static void *receive(void *dummy)
76
{
77
  while (!done) {
78
    int len;
79
    struct nodeID *remote;
80
  static uint8_t buff[BUFFSIZE];
81

    
82
    len = recv_from_peer(s, &remote, buff, BUFFSIZE);
83
    if (len < 0) {
84
      fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
85
      nodeid_free(remote);
86
      continue;
87
    }
88
    dprintf("Received message (%c) from %s\n", buff[0], node_addr(remote));
89
    switch (buff[0] /* Message Type */) {
90
      case MSG_TYPE_TOPOLOGY:
91
        pthread_mutex_lock(&topology_mutex);
92
        topParseData(buff, len);
93
        pthread_mutex_unlock(&topology_mutex);
94
        break;
95
      case MSG_TYPE_CHUNK:
96
        dprintf("Chunk message received:\n");
97
        pthread_mutex_lock(&cb_mutex);
98
        received_chunk(buff, len);
99
        pthread_mutex_unlock(&cb_mutex);
100
        break;
101
      default:
102
        fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
103
    }
104
    nodeid_free(remote);
105
  }
106

    
107
  return NULL;
108
}
109

    
110
static void *topology_sending(void *dummy)
111
{
112
  int gossiping_period = period * 10;
113

    
114
  pthread_mutex_lock(&topology_mutex);
115
  topParseData(NULL, 0);
116
  pthread_mutex_unlock(&topology_mutex);
117
  while(!done) {
118
    pthread_mutex_lock(&topology_mutex);
119
    topParseData(NULL, 0);
120
    pthread_mutex_unlock(&topology_mutex);
121
    usleep(gossiping_period);
122
  }
123

    
124
  return NULL;
125
}
126

    
127
static void *chunk_sending(void *dummy)
128
{
129
  int chunk_period = period / chunks_per_period;
130

    
131
  while(!done) {
132
    const struct nodeID **neighbours;
133
    int n;
134

    
135
    pthread_mutex_lock(&topology_mutex);
136
    neighbours = topGetNeighbourhood(&n);
137
    pthread_mutex_lock(&cb_mutex);
138
    send_chunk(neighbours, n);
139
    pthread_mutex_unlock(&cb_mutex);
140
    pthread_mutex_unlock(&topology_mutex);
141
    usleep(chunk_period);
142
  }
143

    
144
  return NULL;
145
}
146

    
147
void loop(struct nodeID *s1, int csize, int buff_size)
148
{
149
  pthread_t receive_thread, gossiping_thread, distributing_thread;
150
  
151
  period = csize;
152
  s = s1;
153
 
154
  stream_init(buff_size, s);
155
  pthread_mutex_init(&cb_mutex, NULL);
156
  pthread_mutex_init(&topology_mutex, NULL);
157
  pthread_create(&receive_thread, NULL, receive, NULL); 
158
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
159
  pthread_create(&distributing_thread, NULL, chunk_sending, NULL); 
160

    
161
  pthread_join(receive_thread, NULL);
162
  pthread_join(gossiping_thread, NULL);
163
  pthread_join(distributing_thread, NULL);
164
}
165

    
166
void source_loop(const char *fname, struct nodeID *s1, int csize, int chunks, bool loop)
167
{
168
  pthread_t generate_thread, receive_thread, gossiping_thread, distributing_thread;
169
  
170
  period = csize;
171
  chunks_per_period = chunks;
172
  s = s1;
173
 
174
  source_init(fname, s, loop);
175
  pthread_mutex_init(&cb_mutex, NULL);
176
  pthread_mutex_init(&topology_mutex, NULL);
177
  pthread_create(&receive_thread, NULL, source_receive, NULL); 
178
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
179
  pthread_create(&generate_thread, NULL, chunk_forging, NULL); 
180
  pthread_create(&distributing_thread, NULL, chunk_sending, NULL); 
181

    
182
  pthread_join(generate_thread, NULL);
183
  pthread_join(receive_thread, NULL);
184
  pthread_join(gossiping_thread, NULL);
185
  pthread_join(distributing_thread, NULL);
186
}