Statistics
| Branch: | Revision:

streamers / topology.c @ 48a8f955

History | View | Annotate | Download (12.6 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *  Copyright (c) 2010 Luca Abeni
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <stdint.h>
8
#include <stdio.h>
9
#include <sys/time.h>
10
#include <time.h>
11
#include <stdlib.h>
12
#include <string.h>
13

    
14
#include <math.h>
15
#include <net_helper.h>
16
#include <peerset.h>
17
#include <peer.h>
18
#include <grapes_msg_types.h>
19
#include <topmanager.h>
20
#include <tman.h>
21

    
22
#include "compatibility/timer.h"
23

    
24
#include "topology.h"
25
#include "streaming.h"
26
#include "dbg.h"
27
#include "measures.h"
28
#include "streamer.h"
29

    
30
#define MIN(A,B) (((A) < (B)) ? (A) : (B))
31

    
32
double desired_bw = 0;
33
double desired_rtt = 0.2;
34
double alpha_target = 0.5;
35

    
36
int NEIGHBORHOOD_TARGET_SIZE = 20;
37
double NEIGHBORHOOD_ROTATE_RATIO = 1.0;
38
#define TMAN_MAX_IDLE 10
39
#define TMAN_LOG_EVERY 1000
40

    
41
static struct peerset *pset;
42
static struct timeval tout_bmap = {10, 0};
43
static int counter = 0;
44
static int simpleRanker (const void *tin, const void *p1in, const void *p2in);
45
static tmanRankingFunction rankFunct = simpleRanker;
46

    
47
struct metadata {
48
  uint16_t cb_size;
49
  uint16_t cps;
50
  float capacity;
51
  float recv_delay;
52
} __attribute__((packed));
53

    
54
static struct metadata my_metadata;
55
static int cnt = 0;
56
static struct nodeID *me = NULL;
57
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY,MSG_TYPE_TMAN};
58
static struct nodeID ** neighbors;
59

    
60
static void update_metadata(void) {
61
        my_metadata.cb_size = am_i_source() ? 0 : get_cb_size();
62
        my_metadata.recv_delay = get_receive_delay();
63
        my_metadata.cps = get_chunks_per_sec();
64
        my_metadata.capacity = get_capacity();
65
}
66

    
67
static int simpleRanker (const void *tin, const void *p1in, const void *p2in) {
68

    
69
        double t,p1,p2;
70
        t = *((const double *)tin);
71
        p1 = *((const double *)p1in);
72
        p2 = *((const double *)p2in);
73

    
74
        if (isnan(t) || (isnan(p1) && isnan(p2))) return 0;
75
        else if (isnan(p1)) return 2;
76
        else if (isnan(p2)) return 1;
77
        else return (fabs(t-p1) == fabs(t-p2))?0:(fabs(t-p1) < fabs(t-p2))?1:2;
78

    
79
}
80

    
81
int topologyInit(struct nodeID *myID, const char *config)
82
{
83
        int i;
84
        for (i=0;i<2;i++)
85
                bind_msg_type(mTypes[i]);
86
        update_metadata();
87
        me = myID;
88
        return (topInit(myID, &my_metadata, sizeof(my_metadata), config) && tmanInit(myID,&my_metadata, sizeof(my_metadata),rankFunct,0));
89
}
90

    
91
void topologyShutdown(void)
92
{
93
}
94

    
95
int topoAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
96
{
97
        // TODO: check this!! Just to use this function to bootstrap ncast...
98
        struct metadata m = {0};        //TODO: check what metadata option should mean
99

    
100
        if (counter < TMAN_MAX_IDLE)
101
                return topAddNeighbour(neighbour,&m,sizeof(m));
102
        else return tmanAddNeighbour(neighbour,&m,sizeof(m));
103
}
104

    
105
static int topoParseData(const uint8_t *buff, int len)
106
{
107
        int res = -1,ncs = 0,msize;
108
        const struct nodeID **n; const void *m;
109
        if (!buff || buff[0] == MSG_TYPE_TOPOLOGY) {
110
                res = topParseData(buff,len);
111
//                if (counter <= TMAN_MAX_IDLE)
112
//                        counter++;
113
        }
114
        if (counter >= TMAN_MAX_IDLE && (!buff || buff[0] == MSG_TYPE_TMAN))
115
        {
116
                n = topGetNeighbourhood(&ncs);
117
                if (ncs) {
118
                m = topGetMetadata(&msize);
119
                res = tmanParseData(buff,len,n,ncs,m,msize);
120
                }
121
        }
122
  return res;
123
}
124

    
125
static const struct nodeID **topoGetNeighbourhood(int *n)
126
{
127
        int i; double d;
128
        if (counter > TMAN_MAX_IDLE) {
129
                uint8_t *mdata; int msize;
130
                *n = tmanGetNeighbourhoodSize();
131
                if (neighbors) free(neighbors);
132
                neighbors = calloc(*n,sizeof(struct nodeID *));
133
                tmanGetMetadata(&msize);
134
                mdata = calloc(*n,msize);
135
                tmanGivePeers(*n,neighbors,(void *)mdata);
136

    
137
                if (cnt % TMAN_LOG_EVERY == 0) {
138
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_chunk_delay,%f\n",node_addr(me),node_addr(me),my_metadata.recv_delay);
139
                        for (i=0;i<(*n) && i<NEIGHBORHOOD_TARGET_SIZE;i++) {
140
                                d = *((double *)(mdata+i*msize));
141
                                fprintf(stderr,"abouttopublish,%s,",node_addr(me));
142
                                fprintf(stderr,"%s,,Tman_chunk_delay,%f\n",node_addr(neighbors[i]),d);
143
                        }
144
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_neighborhood_size,%d\n\n",node_addr(me),node_addr(me),*n);
145
                }
146

    
147
                free(mdata);
148
                return (const struct nodeID **)neighbors;
149
        }
150
        else
151
                return topGetNeighbourhood(n);
152
}
153

    
154
static void topoAddToBL (struct nodeID *id)
155
{
156
        if (counter >= TMAN_MAX_IDLE)
157
                tmanAddToBlackList(id);
158
//        else
159
                topAddToBlackList(id);
160
}
161

    
162
void add_peer(const struct nodeID *id, const struct metadata *m)
163
{
164
      dprintf("Adding %s to neighbourhood! cb_size:%d\n", node_addr(id), m?m->cb_size:-1);
165
      peerset_add_peer(pset, id);
166
      if (m) peerset_get_peer(pset, id)->cb_size = m->cb_size;
167
      if (m) peerset_get_peer(pset, id)->capacity = m->capacity;
168
      /* add measures here */
169
      add_measures(id);
170
      send_bmap(id);
171
}
172

    
173
void remove_peer(const struct nodeID *id)
174
{
175
      dprintf("Removing %s from neighbourhood!\n", node_addr(id));
176
      /* add measures here */
177
      delete_measures(id);
178
      peerset_remove_peer(pset, id);
179
}
180

    
181
//get the rtt. Currenly only MONL version is supported
182
static double get_rtt_of(const struct nodeID* n){
183
#ifdef MONL
184
  return get_rtt(n);
185
#else
186
  return NAN;
187
#endif
188
}
189

    
190
//get the declared capacity of a node
191
static double get_capacity_of(const struct nodeID* n){
192
  struct peer *p = peerset_get_peer(pset, n);
193
  if (p) {
194
    return p->capacity;
195
  }
196

    
197
  return NAN;
198
}
199

    
200
//returns: 1:yes 0:no -1:unknown
201
int desiredness(const struct nodeID* n) {
202
  double rtt = get_rtt_of(n);
203
  double bw =  get_capacity_of(n);
204

    
205
  if ((isnan(rtt) && finite(desired_rtt)) || (isnan(bw) && desired_bw > 0)) {
206
    return -1;
207
  } else if ((isnan(rtt) || rtt <= desired_rtt) && (isnan(bw) || bw >= desired_bw)) {
208
    return 1;
209
  }
210

    
211
  return 0;
212
}
213

    
214
bool is_desired(const struct nodeID* n) {
215
  return (desiredness(n) == 1);
216
}
217

    
218
// The usual shuffle
219
static void shuffle(void *base, size_t nmemb, size_t size) {
220
  int i;
221
  unsigned char t[size];
222
  unsigned char* b = base;
223

    
224
  for (i = nmemb - 1; i > 0; i--) {
225
    int newpos = (rand()/(RAND_MAX + 1.0)) * (i + 1);
226
    memcpy(t, b + size * newpos, size);
227
    memmove(b + size * newpos, b + size * i, size);
228
    memcpy(b + size * i, t, size);
229
  }
230
}
231

    
232
static void nidset_shuffle(const struct nodeID **base, size_t nmemb) {
233
  shuffle(base, nmemb, sizeof(struct nodeID *));
234
}
235

    
236
static int nidset_filter(const struct nodeID **dst, size_t *dst_size, const struct nodeID **src, size_t src_size, bool(*f)(const struct nodeID *)) {
237
  size_t i;
238
  size_t max_size = *dst_size;
239
  *dst_size = 0;
240

    
241
  for (i = 0; i < src_size; i++) {
242
    if (f(src[i])) {
243
      if (*dst_size < max_size) {
244
        dst[(*dst_size)++] = src[i];
245
      } else {
246
        return -1;
247
      }
248
    }
249
  }
250

    
251
  return 0;
252
}
253

    
254
// B \ A
255
static int nidset_complement(const struct nodeID **dst, size_t *dst_size, const struct nodeID **bs, size_t bs_size, const struct nodeID **as, size_t as_size) {
256
  size_t i, j;
257
  size_t max_size = *dst_size;
258
  *dst_size = 0;
259

    
260
  for (i = 0; i < bs_size; i++) {
261
    for (j = 0; j < as_size; j++) {
262
      if (nodeid_equal(bs[i], as[j])) {
263
        break;
264
      }
265
    }
266
    if (j >= as_size) {
267
      if (*dst_size < max_size) {
268
        dst[(*dst_size)++] = bs[i];
269
      } else {
270
        return -1;
271
      }
272
    }
273
  }
274

    
275
  return 0;
276
}
277

    
278
static bool nidset_find(size_t *i, const struct nodeID **ids, size_t ids_size, const struct nodeID *id) {
279
  for (*i = 0; *i < ids_size; (*i)++) {
280
    if (nodeid_equal(ids[*i],id)) {
281
      return true;
282
    }
283
  }
284
  return false;
285
}
286

    
287
static int nidset_add(const struct nodeID **dst, size_t *dst_size, const struct nodeID **as, size_t as_size, const struct nodeID **bs, size_t bs_size) {
288
  int r;
289
  size_t i;
290
  size_t max_size = *dst_size;
291

    
292
  i = MIN(as_size, max_size);
293
  memcpy(dst, as, i * sizeof(struct nodeID*));
294
  *dst_size = i;
295
  if (i < as_size) return -1;
296

    
297
  max_size -= *dst_size;
298
  r = nidset_complement(dst + *dst_size, &max_size, bs, bs_size, as, as_size);
299
  *dst_size += max_size;
300

    
301
  return r;
302
}
303

    
304
static int nidset_add_i(const struct nodeID **dst, size_t *dst_size, size_t max_size, const struct nodeID **as, size_t as_size) {
305
  int r;
306

    
307
  max_size -= *dst_size;
308
  r = nidset_complement(dst + *dst_size, &max_size, as, as_size, dst, *dst_size);
309
  *dst_size += max_size;
310

    
311
  return r;
312
}
313

    
314
// currently it just makes the peerset grow
315
void update_peers(struct nodeID *from, const uint8_t *buff, int len)
316
{
317
  int n_ids, metasize, i, newids_size, max_ids;
318
  static const struct nodeID **newids;
319
  static const struct metadata *metas;
320
  struct peer *peers;
321
  struct timeval tnow, told;
322

    
323
  if timerisset(&tout_bmap) {
324
    gettimeofday(&tnow, NULL);
325
    timersub(&tnow, &tout_bmap, &told);
326
    peers = peerset_get_peers(pset);
327
    for (i = 0; i < peerset_size(pset); i++) {
328
      if ( (!timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].creation_timestamp, &told, <) ) ||
329
           ( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <)     )   ) {
330
        fprintf(stderr,"Topo: dropping inactive %s (peers:%d)\n", node_addr(peers[i].id), peerset_size(pset));
331
        //if (peerset_size(pset) > 1) {        // avoid dropping our last link to the world
332
        topoAddToBL(peers[i].id);
333
        remove_peer(peers[i--].id);
334
        //}
335
      }
336
    }
337
  }
338

    
339
  if (cnt++ % 100 == 0) {
340
        update_metadata();
341
    if (counter > TMAN_MAX_IDLE) {
342
        tmanChangeMetadata(&my_metadata,sizeof(my_metadata));
343
    }
344
  }
345

    
346
  topoParseData(buff, len);
347

    
348
  if (!buff) {
349
    reg_neigh_size(peerset_size(pset));
350
    return;
351
  }
352

    
353
  peers = peerset_get_peers(pset);
354
  n_ids = peerset_size(pset);
355
  newids = topoGetNeighbourhood(&newids_size);        //TODO handle both tman and topo
356
  metas = topGetMetadata(&metasize);        //TODO: check metasize
357
  max_ids = n_ids + newids_size;
358
  fprintf(stderr,"Topo modify start peers:%d candidates:%d\n", n_ids, newids_size);
359
  {
360
    int desired_part;
361
    const struct nodeID *oldids[max_ids], *nodeids[max_ids], *desireds[max_ids], *selecteds[max_ids], *others[max_ids], *toadds[max_ids], *toremoves[max_ids];
362
    size_t oldids_size, nodeids_size, desireds_size, selecteds_size, others_size, toadds_size, toremoves_size;
363
    nodeids_size = desireds_size = selecteds_size = others_size = toadds_size = toremoves_size = max_ids;
364

    
365
    for (i = 0, oldids_size = 0; i < peerset_size(pset); i++) {
366
      oldids[oldids_size++] = peers[i].id;
367
      fprintf(stderr," %s - RTT: %f\n", node_addr(peers[i].id) , get_rtt_of(peers[i].id));
368
    }
369

    
370

    
371
    //compose list of nodeids
372
    nidset_add(nodeids, &nodeids_size, oldids, oldids_size, newids, newids_size);
373

    
374
    // select the alpha_target portion of desired peers
375
    desired_part = (1 - alpha_target) * NEIGHBORHOOD_TARGET_SIZE;
376
    nidset_filter(desireds, &desireds_size, nodeids, nodeids_size, is_desired);
377
    nidset_shuffle(desireds, desireds_size);
378
    selecteds_size = MIN(desireds_size,desired_part);
379
    memcpy(selecteds, desireds, selecteds_size * sizeof(selecteds[0]));
380

    
381
    // random from the rest
382
    nidset_complement(others, &others_size, nodeids, nodeids_size, selecteds, selecteds_size);
383
    nidset_shuffle(others, others_size);
384
    nidset_add_i(selecteds, &selecteds_size, max_ids, others, NEIGHBORHOOD_TARGET_SIZE ? MIN(others_size, NEIGHBORHOOD_TARGET_SIZE - selecteds_size) : others_size);
385

    
386
    fprintf(stderr,"Topo modify (from:%ld sel:%ld) - desired: %ld of %ld (target:%d sel:%ld); random: from %ld (sel:%ld)\n",
387
            (long)nodeids_size, (long)selecteds_size, (long)desireds_size, (long)nodeids_size, desired_part, (long) MIN(desireds_size,desired_part), (long)others_size, (long)selecteds_size - MIN(desireds_size, desired_part));
388
    // add new ones
389
    nidset_complement(toadds, &toadds_size, selecteds, selecteds_size, oldids, oldids_size);
390
    for (i = 0; i < toadds_size; i++) {
391
      size_t j;
392
      //searching for the metadata
393
      if (nidset_find(&j, newids, newids_size, toadds[i])) {
394
        fprintf(stderr," adding %s\n", node_addr(toadds[i]));
395
        add_peer(newids[j], &metas[j]);
396
      } else {
397
        fprintf(stderr," Error: missing metadata for %s\n", node_addr(toadds[i]));
398
      }
399
    }
400

    
401
    // finally, remove those not needed
402
    fprintf(stderr,"Topo remove start (peers:%d)\n", n_ids);
403
    nidset_complement(toremoves, &toremoves_size, nodeids, nodeids_size, selecteds, selecteds_size);
404
    for (i = 0; i < toremoves_size; i++) {
405
      fprintf(stderr," removing %s\n", node_addr(toremoves[i]));
406
      remove_peer(toremoves[i]);
407
    }
408
    fprintf(stderr,"Topo remove end\n");
409
  }
410

    
411
  reg_neigh_size(peerset_size(pset));
412
}
413

    
414
struct peer *nodeid_to_peer(const struct nodeID* id, int reg)
415
{
416
  struct peer *p = peerset_get_peer(pset, id);
417
  if (!p) {
418
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
419
    if (reg) {
420
      add_peer(id,NULL);
421
      fprintf(stderr,"Topo: ext adding %s (peers:%d)\n", node_addr(id), peerset_size(pset));
422
      p = peerset_get_peer(pset,id);
423
    }
424
  }
425

    
426
  return p;
427
}
428

    
429
int peers_init(void)
430
{
431
  fprintf(stderr,"peers_init\n");
432
  pset = peerset_init(0);
433
  return pset ? 1 : 0;
434
}
435

    
436
struct peerset *get_peers(void)
437
{
438
  return pset;
439
}