Statistics
| Branch: | Revision:

streamers / loop-mt.c @ 6abd9b83

History | View | Annotate | Download (4.18 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/time.h>
8
#include <unistd.h>
9
#include <stdint.h>
10
#include <stdlib.h>
11
#include <stdio.h>
12
#include <pthread.h>
13

    
14
#include <net_helper.h>
15
#include <topmanager.h>
16
#include <msg_types.h>
17

    
18
#include "streaming.h"
19
#include "loop.h"
20

    
21
#define BUFFSIZE 64 * 1024
22
static int chunks_per_period = 1;
23
static int period = 500000;
24
static int done;
25
static pthread_mutex_t cb_mutex;
26
static pthread_mutex_t topology_mutex;
27
static struct nodeID *s;
28

    
29
static void *chunk_forging(void *dummy)
30
{
31
  suseconds_t d;
32

    
33
  while(!done) {
34
    pthread_mutex_lock(&cb_mutex);
35
    generated_chunk(&d);
36
    pthread_mutex_unlock(&cb_mutex);
37
    usleep(d);
38
  }
39

    
40
  return NULL;
41
}
42

    
43
static void *source_receive(void *dummy)
44
{
45
  while (!done) {
46
    int len;
47
    struct nodeID *remote;
48
  static uint8_t buff[BUFFSIZE];
49

    
50
    len = recv_from_peer(s, &remote, buff, BUFFSIZE);
51
    switch (buff[0] /* Message Type */) {
52
      case MSG_TYPE_TOPOLOGY:
53
        pthread_mutex_lock(&topology_mutex);
54
        topParseData(buff, len);
55
        pthread_mutex_unlock(&topology_mutex);
56
        break;
57
      case MSG_TYPE_CHUNK:
58
        fprintf(stderr, "Some dumb peer pushed a chunk to me!\n");
59
        break;
60
      default:
61
        fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
62
    }
63
    nodeid_free(remote);
64
  }
65

    
66
  return NULL;
67
}
68

    
69
static void *receive(void *dummy)
70
{
71
  while (!done) {
72
    int len;
73
    struct nodeID *remote;
74
  static uint8_t buff[BUFFSIZE];
75

    
76
    len = recv_from_peer(s, &remote, buff, BUFFSIZE);
77
    switch (buff[0] /* Message Type */) {
78
      case MSG_TYPE_TOPOLOGY:
79
        pthread_mutex_lock(&topology_mutex);
80
        topParseData(buff, len);
81
        pthread_mutex_unlock(&topology_mutex);
82
        break;
83
      case MSG_TYPE_CHUNK:
84
        pthread_mutex_lock(&cb_mutex);
85
        received_chunk(buff, len);
86
        pthread_mutex_unlock(&cb_mutex);
87
        break;
88
      default:
89
        fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
90
    }
91
    nodeid_free(remote);
92
  }
93

    
94
  return NULL;
95
}
96

    
97
static void *topology_sending(void *dummy)
98
{
99
  int gossiping_period = period * 10;
100

    
101
  pthread_mutex_lock(&topology_mutex);
102
  topParseData(NULL, 0);
103
  pthread_mutex_unlock(&topology_mutex);
104
  while(!done) {
105
    pthread_mutex_lock(&topology_mutex);
106
    topParseData(NULL, 0);
107
    pthread_mutex_unlock(&topology_mutex);
108
    usleep(gossiping_period);
109
  }
110

    
111
  return NULL;
112
}
113

    
114
static void *chunk_sending(void *dummy)
115
{
116
  int chunk_period = period / chunks_per_period;
117

    
118
  while(!done) {
119
    const struct nodeID **neighbours;
120
    int n;
121

    
122
    pthread_mutex_lock(&topology_mutex);
123
    neighbours = topGetNeighbourhood(&n);
124
    pthread_mutex_lock(&cb_mutex);
125
    send_chunk(neighbours, n);
126
    pthread_mutex_unlock(&cb_mutex);
127
    pthread_mutex_unlock(&topology_mutex);
128
    usleep(chunk_period);
129
  }
130

    
131
  return NULL;
132
}
133

    
134
void loop(struct nodeID *s1, int csize, int buff_size)
135
{
136
  pthread_t receive_thread, gossiping_thread, distributing_thread;
137
  
138
  period = csize;
139
  s = s1;
140
 
141
  stream_init(buff_size, s);
142
  pthread_mutex_init(&cb_mutex, NULL);
143
  pthread_mutex_init(&topology_mutex, NULL);
144
  pthread_create(&receive_thread, NULL, receive, NULL); 
145
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
146
  pthread_create(&distributing_thread, NULL, chunk_sending, NULL); 
147

    
148
  pthread_join(receive_thread, NULL);
149
  pthread_join(gossiping_thread, NULL);
150
  pthread_join(distributing_thread, NULL);
151
}
152

    
153
void source_loop(const char *fname, struct nodeID *s1, int csize, int chunks, bool loop)
154
{
155
  pthread_t generate_thread, receive_thread, gossiping_thread, distributing_thread;
156
  
157
  period = csize;
158
  chunks_per_period = chunks;
159
  s = s1;
160
 
161
  source_init(fname, s, loop);
162
  pthread_mutex_init(&cb_mutex, NULL);
163
  pthread_mutex_init(&topology_mutex, NULL);
164
  pthread_create(&receive_thread, NULL, source_receive, NULL); 
165
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
166
  pthread_create(&generate_thread, NULL, chunk_forging, NULL); 
167
  pthread_create(&distributing_thread, NULL, chunk_sending, NULL); 
168

    
169
  pthread_join(generate_thread, NULL);
170
  pthread_join(receive_thread, NULL);
171
  pthread_join(gossiping_thread, NULL);
172
  pthread_join(distributing_thread, NULL);
173
}