Statistics
| Branch: | Revision:

streamers / topology.c @ 3e8cd067

History | View | Annotate | Download (14 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *  Copyright (c) 2010 Luca Abeni
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <stdint.h>
8
#include <stdio.h>
9
#include <sys/time.h>
10
#include <time.h>
11
#include <stdlib.h>
12
#include <string.h>
13

    
14
#include <math.h>
15
#include <net_helper.h>
16
#include <peerset.h>
17
#include <peer.h>
18
#include <grapes_msg_types.h>
19
#include <topmanager.h>
20
#include <tman.h>
21

    
22
#include "compatibility/timer.h"
23

    
24
#include "topology.h"
25
#include "nodeid_set.h"
26
#include "streaming.h"
27
#include "dbg.h"
28
#include "measures.h"
29
#include "streamer.h"
30
#include "hrc.h"
31

    
32
#define MIN(A,B) (((A) < (B)) ? (A) : (B))
33
#define MAX(A,B) (((A) > (B)) ? (A) : (B))
34

    
35
double desired_bw = 0;
36
double desired_rtt = 0.2;
37
double alpha_target = 0.5;
38
double topo_mem = 0;
39

    
40
bool topo_out = true; //peer selects out-neighbours
41
bool topo_in = false; //peer selects in-neighbours (combined means bidirectional)
42

    
43
bool topo_keep_best = false;
44
bool topo_add_best = false;
45

    
46
int NEIGHBORHOOD_TARGET_SIZE = 30;
47
#define TMAN_MAX_IDLE 10
48
#define TMAN_LOG_EVERY 1000
49

    
50
#define STREAMER_TOPOLOGY_MSG_ADD 0
51
#define STREAMER_TOPOLOGY_MSG_REMOVE 1
52

    
53
static struct peerset *pset;
54
static struct timeval tout_bmap = {20, 0};
55
static int counter = 0;
56
static int simpleRanker (const void *tin, const void *p1in, const void *p2in);
57
static tmanRankingFunction rankFunct = simpleRanker;
58

    
59
struct metadata {
60
  uint16_t cb_size;
61
  uint16_t cps;
62
  float capacity;
63
  float recv_delay;
64
} __attribute__((packed));
65

    
66
static struct metadata my_metadata;
67
static int cnt = 0;
68
static struct nodeID *me = NULL;
69
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY,MSG_TYPE_TMAN,MSG_TYPE_STREAMER_TOPOLOGY};
70
static struct nodeID ** neighbors;
71

    
72
static void update_metadata(void) {
73
        my_metadata.cb_size = am_i_source() ? 0 : get_cb_size();
74
        my_metadata.recv_delay = get_receive_delay();
75
        my_metadata.cps =  get_running_offer_threads();
76
        my_metadata.capacity = get_capacity();
77
}
78

    
79
static int simpleRanker (const void *tin, const void *p1in, const void *p2in) {
80

    
81
        double t,p1,p2;
82
        t = *((const double *)tin);
83
        p1 = *((const double *)p1in);
84
        p2 = *((const double *)p2in);
85

    
86
        if (isnan(t) || (isnan(p1) && isnan(p2))) return 0;
87
        else if (isnan(p1)) return 2;
88
        else if (isnan(p2)) return 1;
89
        else return (fabs(t-p1) == fabs(t-p2))?0:(fabs(t-p1) < fabs(t-p2))?1:2;
90

    
91
}
92

    
93
int topologyInit(struct nodeID *myID, const char *config)
94
{
95
        int i;
96
        for (i=0;i<sizeof(mTypes)/sizeof(mTypes[0]);i++)
97
                bind_msg_type(mTypes[i]);
98
        update_metadata();
99
        me = myID;
100
        return (topInit(myID, &my_metadata, sizeof(my_metadata), config) && tmanInit(myID,&my_metadata, sizeof(my_metadata),rankFunct,0));
101
}
102

    
103
void topologyShutdown(void)
104
{
105
}
106

    
107
int topoAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
108
{
109
        // TODO: check this!! Just to use this function to bootstrap ncast...
110
        struct metadata m = {0};        //TODO: check what metadata option should mean
111

    
112
        if (counter < TMAN_MAX_IDLE)
113
                return topAddNeighbour(neighbour,&m,sizeof(m));
114
        else return tmanAddNeighbour(neighbour,&m,sizeof(m));
115
}
116

    
117
static int topoParseData(const uint8_t *buff, int len)
118
{
119
        int res = -1,ncs = 0,msize;
120
        const struct nodeID **n; const void *m;
121
        if (!buff || buff[0] == MSG_TYPE_TOPOLOGY) {
122
                res = topParseData(buff,len);
123
//                if (counter <= TMAN_MAX_IDLE)
124
//                        counter++;
125
        }
126
        if (counter >= TMAN_MAX_IDLE && (!buff || buff[0] == MSG_TYPE_TMAN))
127
        {
128
                n = topGetNeighbourhood(&ncs);
129
                if (ncs) {
130
                m = topGetMetadata(&msize);
131
                res = tmanParseData(buff,len,n,ncs,m,msize);
132
                }
133
        }
134
  return res;
135
}
136

    
137
static const struct nodeID **topoGetNeighbourhood(int *n)
138
{
139
        int i; double d;
140
        if (counter > TMAN_MAX_IDLE) {
141
                uint8_t *mdata; int msize;
142
                *n = tmanGetNeighbourhoodSize();
143
                if (neighbors) free(neighbors);
144
                neighbors = calloc(*n,sizeof(struct nodeID *));
145
                tmanGetMetadata(&msize);
146
                mdata = calloc(*n,msize);
147
                tmanGivePeers(*n,neighbors,(void *)mdata);
148

    
149
                if (cnt % TMAN_LOG_EVERY == 0) {
150
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_chunk_delay,%f\n",node_addr(me),node_addr(me),my_metadata.recv_delay);
151
                        for (i=0;i<(*n) && i<NEIGHBORHOOD_TARGET_SIZE;i++) {
152
                                d = *((double *)(mdata+i*msize));
153
                                fprintf(stderr,"abouttopublish,%s,",node_addr(me));
154
                                fprintf(stderr,"%s,,Tman_chunk_delay,%f\n",node_addr(neighbors[i]),d);
155
                        }
156
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_neighborhood_size,%d\n\n",node_addr(me),node_addr(me),*n);
157
                }
158

    
159
                free(mdata);
160
                return (const struct nodeID **)neighbors;
161
        }
162
        else
163
                return topGetNeighbourhood(n);
164
}
165

    
166
static void topoAddToBL (struct nodeID *id)
167
{
168
        if (counter >= TMAN_MAX_IDLE)
169
                tmanAddToBlackList(id);
170
//        else
171
                topAddToBlackList(id);
172
}
173

    
174
//TODO: send metadata as well
175
static int send_topo_msg(struct nodeID *dst, uint8_t type)
176
{
177
  char msg[2];
178
  msg[0] = MSG_TYPE_STREAMER_TOPOLOGY;
179
  msg[1] = type;
180
  return send_to_peer(me, dst, msg, 2);
181
}
182

    
183
static void add_peer(const struct nodeID *id, const struct metadata *m, bool local, bool remote)
184
{
185
  if (local) {
186
      dprintf("Adding %s to neighbourhood! cb_size:%d\n", node_addr(id), m?m->cb_size:-1);
187
      peerset_add_peer(pset, id);
188
      if (m) peerset_get_peer(pset, id)->cb_size = m->cb_size;
189
      if (m) peerset_get_peer(pset, id)->capacity = m->capacity;
190
      /* add measures here */
191
      add_measures(id);
192
      send_bmap(id);
193
  }
194
  if (remote) {
195
      dtprintf("Topo: explicit topo message sent!!! to %s (peers:%d)\n", node_addr(id));
196
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_ADD);
197
  }
198
}
199

    
200
static void remove_peer(const struct nodeID *id, bool local, bool remote)
201
{
202
  if (local) {
203
      dprintf("Removing %s from neighbourhood!\n", node_addr(id));
204
      /* add measures here */
205
      delete_measures(id);
206
      peerset_remove_peer(pset, id);
207
  }
208
  if (remote) {
209
      dtprintf("Topo: explicit topo message sent!!! to %s (peers:%d)\n", node_addr(id));
210
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_REMOVE);
211
  }
212
}
213

    
214
//get the rtt. Currenly only MONL version is supported
215
static double get_rtt_of(const struct nodeID* n){
216
#ifdef MONL
217
  return get_rtt(n);
218
#else
219
  return NAN;
220
#endif
221
}
222

    
223
//get the declared capacity of a node
224
static double get_capacity_of(const struct nodeID* n){
225
  struct peer *p = peerset_get_peer(pset, n);
226
  if (p) {
227
    return p->capacity;
228
  }
229

    
230
  return NAN;
231
}
232

    
233
//returns: 1:yes 0:no -1:unknown
234
int desiredness(const struct nodeID* n) {
235
  double rtt = get_rtt_of(n);
236
  double bw =  get_capacity_of(n);
237

    
238
  if ((isnan(rtt) && finite(desired_rtt)) || (isnan(bw) && desired_bw > 0)) {
239
    return -1;
240
  } else if ((isnan(rtt) || rtt <= desired_rtt) && (isnan(bw) || bw >= desired_bw)) {
241
    return 1;
242
  }
243

    
244
  return 0;
245
}
246

    
247
bool is_desired(const struct nodeID* n) {
248
  return (desiredness(n) == 1);
249
}
250

    
251
int cmp_rtt(const struct nodeID* a, const struct nodeID* b) {
252
  double ra, rb;
253
  ra = get_rtt_of(a);
254
  rb = get_rtt_of(a);
255
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
256
  else if (isnan(rb) || ra < rb) return -1;
257
  else return 1;
258
}
259

    
260
int vcmp_rtt(const void* a, const void* b) {
261
  return cmp_rtt((const struct nodeID*)a, (const struct nodeID*)b);
262
}
263

    
264
// currently it just makes the peerset grow
265
void update_peers(struct nodeID *from, const uint8_t *buff, int len)
266
{
267
  int n_ids, metasize, i, newids_size, max_ids;
268
  static const struct nodeID **newids;
269
  static const struct metadata *metas;
270
  struct peer *peers;
271
  struct timeval tnow, told;
272
  static const struct nodeID **savedids;
273
  static int savedids_size;
274

    
275
  if timerisset(&tout_bmap) {
276
    gettimeofday(&tnow, NULL);
277
    timersub(&tnow, &tout_bmap, &told);
278
    peers = peerset_get_peers(pset);
279
    for (i = 0; i < peerset_size(pset); i++) {
280
      if ( (!timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].creation_timestamp, &told, <) ) ||
281
           ( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <)     )   ) {
282
        ftprintf(stderr,"Topo: dropping inactive %s (peers:%d)\n", node_addr(peers[i].id), peerset_size(pset));
283
        //if (peerset_size(pset) > 1) {        // avoid dropping our last link to the world
284
        topoAddToBL(peers[i].id);
285
        remove_peer(peers[i--].id, true, true);
286
        //}
287
      }
288
    }
289
  }
290

    
291
  if (cnt++ % 100 == 0) {
292
        update_metadata();
293
    if (counter > TMAN_MAX_IDLE) {
294
        tmanChangeMetadata(&my_metadata,sizeof(my_metadata));
295
    }
296
  }
297

    
298
  //handle explicit add/remove messages
299
  if (topo_in && buff && buff[0] == MSG_TYPE_STREAMER_TOPOLOGY) {
300
    dtprintf(stderr,"Topo: explicit topo message received!!!from %s (peers:%d)\n", node_addr(from), peerset_size(pset));
301
    if (len != 2) {
302
      fprintf(stderr, "Bad streamer topo message received, len:%d!\n", len);
303
      return;
304
    }
305
    switch (buff[1]) {
306
      case STREAMER_TOPOLOGY_MSG_ADD:
307
        ftprintf(stderr,"Topo: adding on request %s (peers:%d)\n", node_addr(from), peerset_size(pset));
308
        if (!peerset_get_peer(pset, from))
309
                add_peer(from, NULL, true, false);
310
        break;
311
      case STREAMER_TOPOLOGY_MSG_REMOVE:
312
        ftprintf(stderr,"Topo: removing on request %s (peers:%d)\n", node_addr(from), peerset_size(pset));
313
        remove_peer(from, true, false);
314
        break;
315
      default:
316
        fprintf(stderr, "Bad streamer topo message received!\n");
317
    }
318
    reg_neigh_size(peerset_size(pset));
319
    return;
320
  }
321

    
322
  topoParseData(buff, len);
323

    
324
  if (!buff) {
325
    reg_neigh_size(peerset_size(pset));
326
    return;
327
  }
328

    
329
  peers = peerset_get_peers(pset);
330
  n_ids = peerset_size(pset);
331
  newids = topoGetNeighbourhood(&newids_size);        //TODO handle both tman and topo
332
  metas = topGetMetadata(&metasize);        //TODO: check metasize
333
  max_ids = n_ids + savedids_size + newids_size;
334
  ftprintf(stderr,"Topo modify start peers:%d candidates:%d\n", n_ids, newids_size);
335
  {
336
    int desired_part;
337
    const struct nodeID *oldids[max_ids], *nodeids[max_ids], *candidates[max_ids], *desireds[max_ids], *selecteds[max_ids], *others[max_ids], *toadds[max_ids], *toremoves[max_ids];
338
    int oldids_size, nodeids_size, candidates_size, desireds_size, selecteds_size, others_size, toadds_size, toremoves_size, keep_size, random_size;
339
    nodeids_size = candidates_size = desireds_size = selecteds_size = others_size = toadds_size = toremoves_size = max_ids;
340

    
341
    if (topo_out) {
342
      for (i = 0, oldids_size = 0; i < peerset_size(pset); i++) {
343
        oldids[oldids_size++] = peers[i].id;
344
        fprintf(stderr," %s - RTT: %f\n", node_addr(peers[i].id) , get_rtt_of(peers[i].id));
345
      }
346
    } else {
347
      for (i = 0, oldids_size = 0; i < savedids_size; i++) {
348
        oldids[oldids_size++] = savedids[i];
349
        fprintf(stderr," %s - RTT: %f\n", node_addr(savedids[i]) , get_rtt_of(savedids[i]));
350
      }
351
      savedids_size = 0;
352
      free(savedids);
353
    }
354

    
355
    // select the topo_mem portion of peers to be kept (uniform random)
356
    if (topo_keep_best) {
357
      qsort(oldids, oldids_size, sizeof(oldids[0]), vcmp_rtt);
358
    } else {
359
      nidset_shuffle(oldids, oldids_size);
360
    }
361
    keep_size = selecteds_size = (int) (topo_mem * oldids_size);
362
    memcpy(selecteds, oldids, selecteds_size * sizeof(selecteds[0]));
363

    
364
    // compose list of known nodeids
365
    nidset_add(nodeids, &nodeids_size, oldids, oldids_size, newids, newids_size);
366

    
367
    // compose list of candidate nodeids
368
    nidset_complement(candidates, &candidates_size, nodeids, nodeids_size, selecteds, selecteds_size);
369

    
370
    // select the alpha_target portion of desired peers
371
    desired_part = (1 - alpha_target) * (NEIGHBORHOOD_TARGET_SIZE ? (MAX (NEIGHBORHOOD_TARGET_SIZE - selecteds_size, 0)) : 0);
372
    nidset_filter(desireds, &desireds_size, candidates, candidates_size, is_desired);
373
    if (topo_add_best) {
374
      qsort(desireds, desireds_size, sizeof(desireds[0]), vcmp_rtt);
375
    } else {
376
      nidset_shuffle(desireds, desireds_size);
377
    }
378
    nidset_add_i(selecteds, &selecteds_size, max_ids, desireds, MIN(desireds_size,desired_part));
379

    
380
    // random from the rest
381
    nidset_complement(others, &others_size, candidates, candidates_size, selecteds, selecteds_size);
382
    nidset_shuffle(others, others_size);
383
    random_size = NEIGHBORHOOD_TARGET_SIZE ? MIN(others_size, MAX(NEIGHBORHOOD_TARGET_SIZE - selecteds_size, 0)) : others_size;
384
    nidset_add_i(selecteds, &selecteds_size, max_ids, others, random_size);
385

    
386
    fprintf(stderr,"Topo modify sel:%d (from:%d) = keep: %d (of old:%d) + desired: %d (from %d of %d; target:%d) + random: %d (from %d)\n",
387
            selecteds_size, nodeids_size,
388
            keep_size, oldids_size,
389
            MIN(desireds_size,desired_part), desireds_size, candidates_size, desired_part,
390
            random_size, others_size);
391
    // add new ones
392
    nidset_complement(toadds, &toadds_size, selecteds, selecteds_size, oldids, oldids_size);
393
    for (i = 0; i < toadds_size; i++) {
394
      int j;
395
      //searching for the metadata
396
      if (nidset_find(&j, newids, newids_size, toadds[i])) {
397
        fprintf(stderr," adding %s\n", node_addr(toadds[i]));
398
        add_peer(newids[j], &metas[j], topo_out, topo_in);
399
      } else {
400
        fprintf(stderr," Error: missing metadata for %s\n", node_addr(toadds[i]));
401
      }
402
    }
403

    
404
    // finally, remove those not needed
405
    fprintf(stderr,"Topo remove start (peers:%d)\n", n_ids);
406
    nidset_complement(toremoves, &toremoves_size, oldids, oldids_size, selecteds, selecteds_size);
407
    for (i = 0; i < toremoves_size; i++) {
408
      fprintf(stderr," removing %s\n", node_addr(toremoves[i]));
409
      remove_peer(toremoves[i], topo_out, topo_in);
410
    }
411
    fprintf(stderr,"Topo remove end\n");
412

    
413
    if (!topo_out) {
414
      savedids = malloc(selecteds_size * sizeof(savedids[0]));        //TODO: handle errors
415
      for (i = 0, savedids_size = 0; i < selecteds_size; i++) {
416
        savedids[savedids_size++] = nodeid_dup(selecteds[i]);
417
      }
418
      for (i = 0; i < oldids_size; i++) {
419
        nodeid_free(oldids[i]);
420
      }
421
    }
422
  }
423

    
424
  reg_neigh_size(peerset_size(pset));
425
}
426

    
427
struct peer *nodeid_to_peer(const struct nodeID* id, int reg)
428
{
429
  struct peer *p = peerset_get_peer(pset, id);
430
  if (!p) {
431
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
432
    if (reg) {
433
      add_peer(id,NULL, topo_out, false);
434
      fprintf(stderr,"Topo: ext adding %s (peers:%d)\n", node_addr(id), peerset_size(pset));
435
      p = peerset_get_peer(pset,id);
436
    }
437
  }
438

    
439
  return p;
440
}
441

    
442
int peers_init(void)
443
{
444
  fprintf(stderr,"peers_init\n");
445
  pset = peerset_init(0);
446
  return pset ? 1 : 0;
447
}
448

    
449
struct peerset *get_peers(void)
450
{
451
  return pset;
452
}