Revision 74a5d4ae

View differences:

Chunkiser/input-stream-avs.c
4 4
 *  This is free software; see gpl-3.0.txt
5 5
 */
6 6

  
7

  
8
#define __STDC_CONSTANT_MACROS 1
9
#include <stdint.h>
10
#include <stdbool.h>
11

  
12

  
13
#ifdef __cplusplus
14
extern "C" {
15
#endif
16 7
#include <libavformat/avformat.h>
17
#ifdef __cplusplus
18
}
19
#endif
20

  
8
#include <stdbool.h>
21 9

  
22 10
#include "../dbg.h"
23 11
#include "../input-stream.h"
......
123 111
struct input_stream *input_stream_open(const char *fname, int *period, uint16_t flags)
124 112
{
125 113
  struct input_stream *desc;
126
  size_t i;
127
  int res;
114
  int i, res;
128 115

  
129 116
  avcodec_register_all();
130 117
  av_register_all();
131 118

  
132
  desc = (struct input_stream *)malloc(sizeof(struct input_stream));
119
  desc = malloc(sizeof(struct input_stream));
133 120
  if (desc == NULL) {
134 121
    return NULL;
135 122
  }
......
298 285
      }
299 286
    }
300 287
    *size = pkt.size + s->s->streams[pkt.stream_index]->codec->extradata_size * header_out + header_size + FRAME_HEADER_SIZE;
301
    data = (uint8_t *)malloc(*size);
288
    data = malloc(*size);
302 289
    if (data == NULL) {
303 290
      *size = -1;
304 291
      av_free_packet(&pkt);
Makefile
37 37
LDLIBS += -lgrapes
38 38
ifdef ML
39 39
LDFLAGS += -L$(NAPA)/ml -L$(LIBEVENT_DIR)/lib
40
LDLIBS += -lml
40
LDLIBS += -lml -lm
41 41
CPPFLAGS += -I$(NAPA)/ml/include -I$(LIBEVENT_DIR)/include
42 42
ifdef MONL
43 43
LDFLAGS += -L$(NAPA)/dclog -L$(NAPA)/rep -L$(NAPA)/monl -L$(NAPA)/common
44
LDLIBS += -lmon -lrep -ldclog -lcommon
44
LDLIBS += -lstdc++ -lmon -lrep -ldclog -lcommon
45 45
CPPFLAGS += -DMONL
46 46
ifdef STATIC
47 47
CC=g++
48
else
49
LDLIBS_EXTRA = -lm -lstdc++
50 48
endif
51 49
endif
52 50
LDLIBS += -Wl,-static -levent $(if $(STATIC), , -Wl,-Bdynamic) -lrt
53 51
#LDLIBS += -levent -lrt
52
LDLIBS += $(LIBEVENT_DIR)/lib/libevent.a -lrt
54 53
endif
55 54

  
56 55
OBJS += streaming.o
......
86 85
CPPFLAGS += -I$(FFMPEG_DIR)/include
87 86
LDFLAGS += -L$(FFMPEG_DIR)/lib
88 87
LDLIBS += -lavformat -lavcodec -lavutil
89
LDLIBS_EXTRA += -lm
88
LDLIBS += -lm
90 89
LDLIBS += $(call ld-option, -lz)
91 90
LDLIBS += $(call ld-option, -lbz2)
92 91
else
......
111 110
OBJS += input-stream-dummy.o out-stream-dummy.o
112 111
endif
113 112

  
114
LDLIBS += $(LDLIBS_EXTRA)
115

  
116 113
EXECTARGET = offerstreamer
117 114
ifdef ML
118 115
EXECTARGET := $(EXECTARGET)-ml
channel.c
10 10

  
11 11
static char * chname = NULL;
12 12

  
13
void channel_set_name(const char *ch)
13
void channel_set_name(char *ch)
14 14
{
15 15
  free(chname);
16 16
  chname = strdup(ch);
channel.h
6 6
#ifndef CHANNEL_H
7 7
#define CHANNEL_H
8 8

  
9
void channel_set_name(const char *);
9
void channel_set_name(char *);
10 10
const char *channel_get_name();
11 11

  
12 12
#endif
chunk_signaling.c
17 17
#include <errno.h>
18 18
#include <assert.h>
19 19
#include <string.h>
20

  
21 20
#include "peer.h"
22 21
#include "peerset.h"
23 22
#include "chunkidset.h"
24
#include "trade_sig_ha.h"
25 23
#include "trade_sig_la.h"
24
#include "chunk_signaling.h"
26 25
#include "msg_types.h"
27 26
#include "net_helper.h"
28 27

  
29
#include "chunk_signaling.h"
30

  
31 28
#include "streaming.h"
32 29
#include "topology.h"
33 30
#include "dbg.h"
......
202 199
        case MSG_SIG_BMOFF:
203 200
        {
204 201
          int dummy;
205
          const struct nodeID *ownerid = nodeid_undump(&(signal->third_peer),&dummy);
202
          struct nodeID *ownerid = nodeid_undump(&(signal->third_peer),&dummy);
206 203
          bmap_received(fromid, ownerid, c_set, signal->cb_size, signal->trans_id);
207 204
          nodeid_free(ownerid);
208 205
          break;
chunkbuffer_helper.h
17 17
  return chbAddChunk(cb, c);
18 18
}
19 19

  
20
inline struct chunk *cb_get_chunks(const struct chunk_buffer *cb, size_t *n) {
20
inline struct chunk *cb_get_chunks(const struct chunk_buffer *cb, int *n) {
21 21
 return chbGetChunks(cb, n);
22 22
}
23 23

  
chunklock.c
19 19

  
20 20
struct lock {
21 21
  int chunkid;
22
  const struct nodeID *peer;
22
  struct nodeID *peer;
23 23
  struct timeval timestamp;
24 24
};
25 25

  
......
31 31
{
32 32
  if (!locks) {
33 33
    lsize = LSIZE_INCREMENT;
34
    locks = (struct lock *)malloc(sizeof(struct lock) * lsize);
34
    locks = malloc(sizeof(struct lock) * lsize);
35 35
    lcount = 0;
36 36
  }
37 37

  
38 38
  if (lcount == lsize) {
39 39
    lsize += LSIZE_INCREMENT;
40
    locks = (struct lock *)realloc(locks , sizeof(struct lock) * lsize);
40
    locks = realloc(locks , sizeof(struct lock) * lsize);
41 41
  }
42 42

  
43 43
  if (!locks) {
......
80 80
}
81 81

  
82 82
void chunk_unlock(int chunkid){
83
  size_t i;
83
  int i;
84 84

  
85 85
  for (i=0; i<lcount; i++) {
86 86
    if (locks[i].chunkid == chunkid) {
......
91 91
}
92 92

  
93 93
int chunk_islocked(int chunkid){
94
  size_t i;
94
  int i;
95 95

  
96 96
  chunk_locks_cleanup();
97 97

  
input.c
30 30
  struct input_desc *res;
31 31
  struct timeval tv;
32 32

  
33
  res = (struct input_desc *)malloc(sizeof(struct input_desc));
33
  res = malloc(sizeof(struct input_desc));
34 34
  if (res == NULL) {
35 35
    return NULL;
36 36
  }
input.h
10 10
#define INPUT_LOOP 0x0001
11 11

  
12 12
struct input_desc;
13
struct chunk;
14 13

  
15 14
struct input_desc *input_open(const char *fname, uint16_t flags);
16 15
void input_close(struct input_desc *s);
loop.c
13 13
#include <stdbool.h>
14 14

  
15 15
#include <net_helper.h>
16

  
17 16
#include <msg_types.h>
18 17
#include <peerset.h>
19 18
#include <peer.h>
20 19

  
21 20
#include "chunk_signaling.h"
22
#include "topology.h"
23 21
#include "streaming.h"
22
#include "topology.h"
24 23
#include "loop.h"
25 24
#include "dbg.h"
26 25

  
......
67 66
    tout_init(&tv);
68 67
    res = wait4data(s, &tv, NULL);
69 68
    if (res > 0) {
70
      const struct nodeID *remote;
69
      struct nodeID *remote;
71 70

  
72 71
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
73 72
      if (len < 0) {
......
136 135
    res = wait4data(s, &tv, NULL);
137 136
#endif
138 137
    if (res > 0) {
139
      const struct nodeID *remote;
138
      struct nodeID *remote;
140 139

  
141 140
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
142 141
      if (len < 0) {
measures-monl.c
246 246
/*
247 247
 * Helper to retrieve a measure
248 248
*/
249
double get_measure(const struct nodeID *id, int j, enum stat_types st)
249
double get_measure(struct nodeID *id, int j, enum stat_types st)
250 250
{
251 251
	return (id->n_mhs > j) ? monRetrieveResult(id->mhs[j], st) : NAN;
252 252
}
......
254 254
/*
255 255
 * Hopcount to a given peer
256 256
*/
257
int get_hopcount(const struct nodeID *id){
257
int get_hopcount(struct nodeID *id){
258 258
	double r = get_measure(id, 0, LAST);
259 259
	return isnan(r) ? -1 : (int) r;
260 260
}
......
262 262
/*
263 263
 * RTT to a given peer in seconds
264 264
*/
265
double get_rtt(const struct nodeID *id){
265
double get_rtt(struct nodeID *id){
266 266
	return get_measure(id, 1, WIN_AVG);
267 267
}
268 268

  
269 269
/*
270 270
 * average RTT to a set of peers in seconds
271 271
*/
272
double get_average_rtt(const struct nodeID **ids, int len){
272
double get_average_rtt(struct nodeID **ids, int len){
273 273
	int i;
274 274
	int n = 0;
275 275
	double sum = 0;
......
287 287
/*
288 288
 * loss ratio from a given peer as 0..1
289 289
*/
290
double get_lossrate(const struct nodeID *id){
290
double get_lossrate(struct nodeID *id){
291 291
	return get_measure(id, 2, WIN_AVG);
292 292
}
293 293

  
294 294
/*
295 295
 * average loss ratio from a set of peers as 0..1
296 296
*/
297
double get_average_lossrate(const struct nodeID **ids, int len){
297
double get_average_lossrate(struct nodeID **ids, int len){
298 298
	int i;
299 299
	int n = 0;
300 300
	double sum = 0;
measures.h
22 22
void reg_offer_accept(bool b);
23 23

  
24 24
#ifdef MONL
25
double get_rtt(const struct nodeID *id);
26
double get_lossrate(const struct nodeID *id);
27
double get_average_lossrate(const struct  nodeID**id, int len);
28
int get_hopcount(const struct nodeID *id);
25
double get_rtt(struct nodeID *id);
26
double get_lossrate(struct nodeID *id);
27
double get_average_lossrate(struct  nodeID**id, int len);
28
int get_hopcount(struct nodeID *id);
29 29
#endif
30 30

  
31 31
#endif
out-stream-avf.c
4 4
 *  This is free software; see gpl-3.0.txt
5 5
 */
6 6

  
7
#define __STDC_CONSTANT_MACROS 1
8
#include <stdint.h>
9

  
10
#ifdef __cplusplus
11
extern "C" {
12
#endif
13 7
#include <libavformat/avformat.h>
14
#ifdef __cplusplus
15
}
16
#endif
17

  
18

  
19 8
#include <stdio.h>
20 9

  
21 10
#include "out-stream.h"
......
58 47
      return CODEC_ID_DIRAC;
59 48
    default:
60 49
      fprintf(stderr, "Unknown codec %d\n", mytype);
61
      return CODEC_ID_NONE;
50
      return 0;
62 51
  }
63 52
}
64 53

  
......
149 138
    dts += (dts < prev_dts - ((1L << 31) - 1)) ? ((prev_dts >> 32) + 1) << 32 : (prev_dts >> 32) << 32;
150 139
    prev_dts = dts;
151 140
    pkt.dts = av_rescale_q(dts, outctx->streams[0]->codec->time_base, outctx->streams[0]->time_base);
152
    pkt.data = (uint8_t *)p;
141
    pkt.data = p;
153 142
    p += frame_size;
154 143
    pkt.size = frame_size;
155 144
    av_interleaved_write_frame(outctx, &pkt);
output.c
20 20
static int buff_size;
21 21

  
22 22
struct outbuf {
23
  uint8_t *data;
23
  void *data;
24 24
  int size;
25 25
  int id;
26 26
  uint64_t timestamp;
......
33 33
    int i;
34 34

  
35 35
    buff_size = bufsize;
36
    buff = (struct outbuf *)malloc(sizeof(struct outbuf) * buff_size);
36
    buff = malloc(sizeof(struct outbuf) * buff_size);
37 37
    if (!buff) {
38 38
     fprintf(stderr, "Error: can't allocate output buffer\n");
39 39
     exit(1);
......
148 148
      exit(-1);
149 149
    }
150 150
    /* We previously flushed, so we know that c->id is free */
151
    buff[c->id % buff_size].data = (uint8_t *)malloc(c->size);
151
    buff[c->id % buff_size].data = malloc(c->size);
152 152
    memcpy(buff[c->id % buff_size].data, c->data, c->size);
153 153
    buff[c->id % buff_size].size = c->size;
154 154
    buff[c->id % buff_size].id = c->id;
streamer.c
132 132
    return -1;
133 133
  }
134 134
  if (srv_port != 0) {
135
    const struct nodeID *srv;
135
    struct nodeID *srv;
136 136

  
137 137
    srv = create_node(srv_ip, srv_port);
138 138
    if (srv == NULL) {
streaming.c
32 32
#include "measures.h"
33 33

  
34 34
#include "scheduler_la.h"
35
#include "chunkbuffer.h"
36
#include "chunkbuffer_helper.h"
37 35

  
38 36
struct chunk_attributes {
39 37
  uint64_t deadline;
......
109 107
  assert(!c->attributes && c->attributes_size == 0);
110 108

  
111 109
  c->attributes_size = sizeof(struct chunk_attributes);
112
  c->attributes = ca = (struct chunk_attributes *)malloc(c->attributes_size);
110
  c->attributes = ca = malloc(c->attributes_size);
113 111

  
114 112
  ca->deadline = c->timestamp;
115 113
  ca->deadline_increment = 2;
......
128 126
  return ca->hopcount;
129 127
}
130 128

  
131
void chunk_attributes_update_received(const struct chunk* c)
129
void chunk_attributes_update_received(struct chunk* c)
132 130
{
133 131
  struct chunk_attributes * ca;
134 132

  
......
142 140
  dprintf("Received chunk %d with hopcount %hu\n", c->id, ca->hopcount);
143 141
}
144 142

  
145
void chunk_attributes_update_sending(const struct chunk* c)
143
void chunk_attributes_update_sending(struct chunk* c)
146 144
{
147 145
  struct chunk_attributes * ca;
148 146

  
......
159 157
struct chunkID_set *cb_to_bmap(struct chunk_buffer *chbuf)
160 158
{
161 159
  struct chunk *chunks;
162
  size_t num_chunks;
163
  int i;
160
  int num_chunks, i;
164 161
  struct chunkID_set *my_bmap = chunkID_set_init("type=1");
165 162
  chunks = cb_get_chunks(chbuf, &num_chunks);
166 163

  
......
221 218
  n = peerset_size(pset);
222 219
  neighbours = peerset_get_peers(pset);
223 220
  {
224
    const struct nodeID *nodeids[n];
221
    struct nodeID *nodeids[n];
225 222
    for (i = 0; i<n; i++) nodeids[i] = neighbours[i].id;
226 223
#ifdef MONL
227 224
    return get_average_lossrate(nodeids, n);
......
242 239
  send_bmap(p);	//send explicit ack
243 240
}
244 241

  
245
void received_chunk(const struct nodeID *from, const uint8_t *buff, int len)
242
void received_chunk(struct nodeID *from, const uint8_t *buff, int len)
246 243
{
247 244
  int res;
248 245
  static struct chunk c;
......
301 298
 *
302 299
 * Looks at buffermap information received about the given peer.
303 300
 */
304
int needs(const struct nodeID *n, int cid){
301
int needs(struct nodeID *n, int cid){
305 302
  struct peer * p = nodeid_to_peer(n, 0);
306 303
  if (!p) return 1; // if we don't know this peer, but we assume it needs the chunk (aggressive behaviour!)
307 304

  
......
333 330
  return 0;
334 331
}
335 332

  
336
double peerWeightReceivedfrom(const struct nodeID * const *n){
333
double peerWeightReceivedfrom(struct nodeID **n){
337 334
  struct peer * p = nodeid_to_peer(*n, 0);
338 335
  if (!p) return 0;
339 336
  return timerisset(&p->bmap_timestamp) ? 1 : 0.1;
340 337
}
341 338

  
342
double peerWeightUniform(const struct nodeID * const *n){
339
double peerWeightUniform(struct nodeID **n){
343 340
  return 1;
344 341
}
345 342

  
346
double peerWeightRtt(const struct nodeID * const *n){
343
double peerWeightRtt(struct nodeID **n){
347 344
#ifdef MONL
348 345
  double rtt = get_rtt(*n);
349 346
  //dprintf("RTT to %s: %f\n", node_addr(p->id), rtt);
......
369 366
  return -latest;
370 367
}
371 368

  
372
double getChunkTimestamp(const int *cid){
373
  const struct chunk *c = cb_get_chunk(cb, *cid);
369
double getChunkTimestamp(int *cid){
370
  struct chunk *c = cb_get_chunk(cb, *cid);
374 371
  if (!c) return 0;
375 372

  
376 373
  return (double) c->timestamp;
......
382 379
  cset_acc_size = chunkID_set_size(cset_acc);
383 380
  reg_offer_accept(cset_acc_size > 0 ? 1 : 0);	//this only works if accepts are sent back even if 0 is accepted
384 381
  for (i = 0, d=0; i < cset_acc_size && d < max_deliver; i++) {
385
    const struct chunk *c;
382
    struct chunk *c;
386 383
    int chunkid = chunkID_set_get_chunk(cset_acc, i);
387 384
    c = cb_get_chunk(cb, chunkid);
388 385
    if (c && needs(to->id, chunkid) ) {	// we should have the chunk, and he should not have it. Although the "accept" should have been an answer to our "offer", we do some verification
......
404 401
  return offer_per_tick;
405 402
}
406 403

  
407
int offer_max_deliver(const struct nodeID *n)
404
int offer_max_deliver(struct nodeID *n)
408 405
{
409 406
#ifdef MONL
410 407
  switch (get_hopcount(n)) {
......
420 417
void send_offer()
421 418
{
422 419
  struct chunk *buff;
423
  size_t size, i, n;
424
  int res;
420
  int size, res, i, n;
425 421
  struct peer *neighbours;
426 422
  struct peerset *pset;
427 423

  
......
436 432
  {
437 433
    size_t selectedpeers_len = offer_peer_count();
438 434
    int chunkids[size];
439
    const struct nodeID *nodeids[n];
440
    const struct nodeID *selectedpeers[selectedpeers_len];
435
    struct nodeID *nodeids[n];
436
    struct nodeID *selectedpeers[selectedpeers_len];
441 437

  
442 438
    //reduce load a little bit if there are losses on the path from this guy
443 439
    double average_lossrate = get_average_lossrate_pset(pset);
......
448 444

  
449 445
    for (i = 0;i < size; i++) chunkids[i] = (buff+i)->id;
450 446
    for (i = 0; i<n; i++) nodeids[i] = (neighbours+i)->id;
451
    schedSelectPeersForChunks(SCHED_WEIGHTED, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, needs, (transid % 2) ? peerWeightReceivedfrom : peerWeightRtt);	//select a peer that needs at least one of our chunks
447
    selectPeersForChunks(SCHED_WEIGHTED, nodeids, n, chunkids, size, selectedpeers, &selectedpeers_len, needs, (transid % 2) ? peerWeightReceivedfrom : peerWeightRtt);	//select a peer that needs at least one of our chunks
452 448

  
453 449
    for (i=0; i<selectedpeers_len ; i++){
454 450
      int max_deliver = offer_max_deliver(selectedpeers[i]);
......
464 460
void send_chunk()
465 461
{
466 462
  struct chunk *buff;
467
  size_t size, i, n;
468
  int res;
463
  int size, res, i, n;
469 464
  struct peer *neighbours;
470 465
  struct peerset *pset;
471 466

  
......
487 482
  {
488 483
    size_t selectedpairs_len = 1;
489 484
    int chunkids[size];
490
    const struct nodeID *nodeids[n];
485
    struct nodeID *nodeids[n];
491 486
    struct PeerChunk selectedpairs[1];
492 487
  
493 488
    for (i = 0;i < size; i++) chunkids[i] = (buff+i)->id;
......
497 492

  
498 493
    for (i=0; i<selectedpairs_len ; i++){
499 494
      struct peer *p = nodeid_to_peer(selectedpairs[i].peer, 0);
500
      const struct chunk *c = cb_get_chunk(cb, selectedpairs[i].chunk);
495
      struct chunk *c = cb_get_chunk(cb, selectedpairs[i].chunk);
501 496
      dprintf("\t sending chunk[%d] to ", c->id);
502 497
      dprintf("%s\n", node_addr(p->id));
503 498

  
streaming.h
11 11

  
12 12
void stream_init(int size, struct nodeID *myID);
13 13
int source_init(const char *fname, struct nodeID *myID, bool loop);
14
void received_chunk(const struct nodeID *from, const uint8_t *buff, int len);
14
void received_chunk(struct nodeID *from, const uint8_t *buff, int len);
15 15
void send_chunk();
16 16
int generated_chunk(suseconds_t *delta);
17 17
struct chunkID_set *get_chunks_to_accept(struct peer *from, const struct chunkID_set *cset_off, int max_deliver, int trans_id);
topology.c
9 9
#include <sys/time.h>
10 10
#include <time.h>
11 11

  
12

  
12
#include <net_helper.h>
13 13
#include <peerset.h>
14 14
#include <peer.h>
15 15
#include <topmanager.h>
16 16

  
17
#include <net_helper.h>
18 17
#include "topology.h"
19 18
#include "streaming.h"
20 19
#include "dbg.h"
......
44 43
}
45 44

  
46 45
// currently it just makes the peerset grow
47
void update_peers(const struct nodeID *from, const uint8_t *buff, int len)
46
void update_peers(struct nodeID *from, const uint8_t *buff, int len)
48 47
{
49 48
  int n_ids, i;
50
  const struct nodeID * const *ids;
49
  const struct nodeID **ids;
51 50
  struct peer *peers;
52 51
  struct timeval tnow, told;
53 52

  
......
57 56
    if (peerset_check(pset, from) < 0) {
58 57
      topAddNeighbour(from, NULL, 0);	//@TODO: this is agressive
59 58
      if (!NEIGHBORHOOD_TARGET_SIZE || peerset_size(pset) < NEIGHBORHOOD_TARGET_SIZE) {
60
        add_peer((struct nodeID *)from);
59
        add_peer(from);
61 60
      }
62 61
    }
63 62
  }
......
68 67
  for(i = 0; i < n_ids; i++) {
69 68
    if(peerset_check(pset, ids[i]) < 0) {
70 69
      if (!NEIGHBORHOOD_TARGET_SIZE || peerset_size(pset) < NEIGHBORHOOD_TARGET_SIZE) {
71
        add_peer((struct nodeID *)ids[i]);
70
        add_peer(ids[i]);
72 71
      }
73 72
    }
74 73
  }
......
82 81
         ( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <)     )   ) {
83 82
      //if (peerset_size(pset) > 1) {	// avoid dropping our last link to the world
84 83
        topRemoveNeighbour(peers[i].id);
85
        remove_peer((struct nodeID *)(peers[i--].id));
84
        remove_peer(peers[i--].id);
86 85
      //}
87 86
    }
88 87
  }
......
99 98
    fprintf(stderr,"warning: received message from unknown peer: %s!\n",node_addr(id));
100 99
    if (reg) {
101 100
      topAddNeighbour(id, NULL, 0);	//@TODO: this is agressive
102
      add_peer((struct nodeID *)id);
101
      add_peer(id);
103 102
      p = peerset_get_peer(pset,id);
104 103
    }
105 104
  }
topology.h
8 8

  
9 9
int peers_init();
10 10
struct peerset *get_peers();
11
void update_peers(const struct nodeID *from, const uint8_t *buff, int len);
11
void update_peers(struct nodeID *from, const uint8_t *buff, int len);
12 12
struct peer *nodeid_to_peer(const struct nodeID* id, int reg);

Also available in: Unified diff