Revision 81b601c1

View differences:

input-http.c
19 19
#include "input.h"
20 20

  
21 21
extern struct chunk_buffer *cb;
22

  
23
#ifdef THREADS
24
extern pthread_mutex_t cb_mutex;
25
extern pthread_mutex_t topology_mutex;
26
#else
22 27
pthread_mutex_t cb_mutex;
28
#endif
23 29

  
24 30
struct input_desc {
25 31
  int dummy;
......
42 48

  
43 49
  res->dummy = 0;
44 50
dprintf("BEFORE INIT! %d\n", res->dummy);
51
#ifndef THREADS
45 52
  pthread_mutex_init(&cb_mutex, NULL);
53
#endif
46 54
  //this daemon will listen the network for incoming chunks from a streaming source
47 55
  //on the following path and port
48 56
  httpd = initChunkPuller(UL_DEFAULT_CHUNKBUFFER_PATH, UL_DEFAULT_CHUNKBUFFER_PORT);
......
86 94
	}
87 95

  
88 96
  if(cb) {
97
#ifdef THREADS
98
  	pthread_mutex_lock(&topology_mutex);
99
#endif
89 100
  	pthread_mutex_lock(&cb_mutex);
90
  	res = cb_add_chunk(cb, gchunk);
91
  	free(gchunk);
92
  	pthread_mutex_unlock(&cb_mutex);
93
  }
94
  if (res < 0) { //chunk sequence is older than previous chunk (SHOULD SEND ANYWAY!!!)
95
    free(gchunk->data);
96
    free(gchunk->attributes);
97
    fprintf(stderr, "Chunk %d of %d bytes FAIL res %d\n", gchunk->id, gchunk->size, res);
98
  }
99
  else {
100
    pthread_mutex_lock(&cb_mutex);
101
    send_chunk(); //push it
101
  	res = add_chunk(gchunk);
102
//  	free(gchunk);
103
//  	pthread_mutex_unlock(&cb_mutex);
104
//  }
105
//  if (res < 0) { //chunk sequence is older than previous chunk (SHOULD SEND ANYWAY!!!)
106
//    free(gchunk->data);
107
//    free(gchunk->attributes);
108
//    fprintf(stderr, "Chunk %d of %d bytes FAIL res %d\n", gchunk->id, gchunk->size, res);
109
//  }
110
//  else {
111
//    pthread_mutex_lock(&cb_mutex);
112
    if(res) send_chunk(); //push it
102 113
    pthread_mutex_unlock(&cb_mutex);
114
#ifdef THREADS
115
  	pthread_mutex_unlock(&topology_mutex);
116
#endif
103 117
  }
104 118

  
105 119
  return 0;
loop-mt.c
22 22
#include "loop.h"
23 23

  
24 24
#define BUFFSIZE 512 * 1024
25
#define FDSSIZE 16
25 26
static int chunks_per_period = 1;
26 27
static int period = 500000;
27 28
static int done;
28
static pthread_mutex_t cb_mutex;
29
static pthread_mutex_t topology_mutex;
29
pthread_mutex_t cb_mutex;
30
pthread_mutex_t topology_mutex;
30 31
static struct nodeID *s;
31 32

  
32 33
static void *chunk_forging(void *dummy)
......
184 185
  chunks_per_period = chunks;
185 186
  s = s1;
186 187
 
187
  sigInit(s);
188
  int fds[FDSSIZE];
189
  fds[0] = -1;
190

  
191
//  sigInit(s);
188 192
  peers_init();
189
  if (source_init(fname, s, loop) < 0) {
193
  if (source_init(fname, s, loop, fds, FDSSIZE) < 0) {
190 194
    fprintf(stderr,"Cannot initialize source, exiting");
191 195
    return;
192 196
  }
......
194 198
  pthread_mutex_init(&topology_mutex, NULL);
195 199
  pthread_create(&receive_thread, NULL, source_receive, NULL); 
196 200
  pthread_create(&gossiping_thread, NULL, topology_sending, NULL); 
197
  pthread_create(&generate_thread, NULL, chunk_forging, NULL); 
201
#ifndef HTTPIO
198 202
  pthread_create(&distributing_thread, NULL, chunk_sending, NULL); 
203
  pthread_create(&generate_thread, NULL, chunk_forging, NULL); 
204
#endif
199 205

  
206
#ifndef HTTPIO
200 207
  pthread_join(generate_thread, NULL);
208
  pthread_join(distributing_thread, NULL);
209
#endif
201 210
  pthread_join(receive_thread, NULL);
202 211
  pthread_join(gossiping_thread, NULL);
203
  pthread_join(distributing_thread, NULL);
204 212
}

Also available in: Unified diff