Statistics
| Branch: | Revision:

streamers / topology.c @ e5cd99c1

History | View | Annotate | Download (12.7 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *  Copyright (c) 2010 Luca Abeni
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <stdint.h>
8
#include <stdio.h>
9
#include <sys/time.h>
10
#include <time.h>
11
#include <stdlib.h>
12
#include <string.h>
13

    
14
#include <math.h>
15
#include <net_helper.h>
16
#include <peerset.h>
17
#include <peer.h>
18
#include <grapes_msg_types.h>
19
#include <topmanager.h>
20
#include <tman.h>
21

    
22
#include "compatibility/timer.h"
23

    
24
#include "topology.h"
25
#include "nodeid_set.h"
26
#include "streaming.h"
27
#include "dbg.h"
28
#include "measures.h"
29
#include "streamer.h"
30

    
31
#define MIN(A,B) (((A) < (B)) ? (A) : (B))
32
#define MAX(A,B) (((A) > (B)) ? (A) : (B))
33

    
34
double desired_bw = 0;
35
double desired_rtt = 0.2;
36
double alpha_target = 0.5;
37
double topo_mem = 0;
38

    
39
bool topo_out = true; //peer selects out-neighbours
40
bool topo_in = false; //peer selects in-neighbours (combined means bidirectional)
41

    
42
int NEIGHBORHOOD_TARGET_SIZE = 30;
43
double NEIGHBORHOOD_ROTATE_RATIO = 1.0;
44
#define TMAN_MAX_IDLE 10
45
#define TMAN_LOG_EVERY 1000
46

    
47
#define STREAMER_TOPOLOGY_MSG_ADD 0
48
#define STREAMER_TOPOLOGY_MSG_REMOVE 1
49

    
50
static struct peerset *pset;
51
static struct timeval tout_bmap = {20, 0};
52
static int counter = 0;
53
static int simpleRanker (const void *tin, const void *p1in, const void *p2in);
54
static tmanRankingFunction rankFunct = simpleRanker;
55

    
56
struct metadata {
57
  uint16_t cb_size;
58
  uint16_t cps;
59
  float capacity;
60
  float recv_delay;
61
} __attribute__((packed));
62

    
63
static struct metadata my_metadata;
64
static int cnt = 0;
65
static struct nodeID *me = NULL;
66
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY,MSG_TYPE_TMAN,MSG_TYPE_STREAMER_TOPOLOGY};
67
static struct nodeID ** neighbors;
68

    
69
static void update_metadata(void) {
70
        my_metadata.cb_size = am_i_source() ? 0 : get_cb_size();
71
        my_metadata.recv_delay = get_receive_delay();
72
        my_metadata.cps = get_chunks_per_sec();
73
        my_metadata.capacity = get_capacity();
74
}
75

    
76
static int simpleRanker (const void *tin, const void *p1in, const void *p2in) {
77

    
78
        double t,p1,p2;
79
        t = *((const double *)tin);
80
        p1 = *((const double *)p1in);
81
        p2 = *((const double *)p2in);
82

    
83
        if (isnan(t) || (isnan(p1) && isnan(p2))) return 0;
84
        else if (isnan(p1)) return 2;
85
        else if (isnan(p2)) return 1;
86
        else return (fabs(t-p1) == fabs(t-p2))?0:(fabs(t-p1) < fabs(t-p2))?1:2;
87

    
88
}
89

    
90
int topologyInit(struct nodeID *myID, const char *config)
91
{
92
        int i;
93
        for (i=0;i<sizeof(mTypes)/sizeof(mTypes[0]);i++)
94
                bind_msg_type(mTypes[i]);
95
        update_metadata();
96
        me = myID;
97
        return (topInit(myID, &my_metadata, sizeof(my_metadata), config) && tmanInit(myID,&my_metadata, sizeof(my_metadata),rankFunct,0));
98
}
99

    
100
void topologyShutdown(void)
101
{
102
}
103

    
104
int topoAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
105
{
106
        // TODO: check this!! Just to use this function to bootstrap ncast...
107
        struct metadata m = {0};        //TODO: check what metadata option should mean
108

    
109
        if (counter < TMAN_MAX_IDLE)
110
                return topAddNeighbour(neighbour,&m,sizeof(m));
111
        else return tmanAddNeighbour(neighbour,&m,sizeof(m));
112
}
113

    
114
static int topoParseData(const uint8_t *buff, int len)
115
{
116
        int res = -1,ncs = 0,msize;
117
        const struct nodeID **n; const void *m;
118
        if (!buff || buff[0] == MSG_TYPE_TOPOLOGY) {
119
                res = topParseData(buff,len);
120
//                if (counter <= TMAN_MAX_IDLE)
121
//                        counter++;
122
        }
123
        if (counter >= TMAN_MAX_IDLE && (!buff || buff[0] == MSG_TYPE_TMAN))
124
        {
125
                n = topGetNeighbourhood(&ncs);
126
                if (ncs) {
127
                m = topGetMetadata(&msize);
128
                res = tmanParseData(buff,len,n,ncs,m,msize);
129
                }
130
        }
131
  return res;
132
}
133

    
134
static const struct nodeID **topoGetNeighbourhood(int *n)
135
{
136
        int i; double d;
137
        if (counter > TMAN_MAX_IDLE) {
138
                uint8_t *mdata; int msize;
139
                *n = tmanGetNeighbourhoodSize();
140
                if (neighbors) free(neighbors);
141
                neighbors = calloc(*n,sizeof(struct nodeID *));
142
                tmanGetMetadata(&msize);
143
                mdata = calloc(*n,msize);
144
                tmanGivePeers(*n,neighbors,(void *)mdata);
145

    
146
                if (cnt % TMAN_LOG_EVERY == 0) {
147
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_chunk_delay,%f\n",node_addr(me),node_addr(me),my_metadata.recv_delay);
148
                        for (i=0;i<(*n) && i<NEIGHBORHOOD_TARGET_SIZE;i++) {
149
                                d = *((double *)(mdata+i*msize));
150
                                fprintf(stderr,"abouttopublish,%s,",node_addr(me));
151
                                fprintf(stderr,"%s,,Tman_chunk_delay,%f\n",node_addr(neighbors[i]),d);
152
                        }
153
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_neighborhood_size,%d\n\n",node_addr(me),node_addr(me),*n);
154
                }
155

    
156
                free(mdata);
157
                return (const struct nodeID **)neighbors;
158
        }
159
        else
160
                return topGetNeighbourhood(n);
161
}
162

    
163
static void topoAddToBL (struct nodeID *id)
164
{
165
        if (counter >= TMAN_MAX_IDLE)
166
                tmanAddToBlackList(id);
167
//        else
168
                topAddToBlackList(id);
169
}
170

    
171
//TODO: send metadata as well
172
static int send_topo_msg(struct nodeID *dst, uint8_t type)
173
{
174
  char msg[2];
175
  msg[0] = MSG_TYPE_STREAMER_TOPOLOGY;
176
  msg[1] = type;
177
  return send_to_peer(me, dst, msg, 2);
178
}
179

    
180
static void add_peer(const struct nodeID *id, const struct metadata *m, bool local, bool remote)
181
{
182
  if (local) {
183
      dprintf("Adding %s to neighbourhood! cb_size:%d\n", node_addr(id), m?m->cb_size:-1);
184
      peerset_add_peer(pset, id);
185
      if (m) peerset_get_peer(pset, id)->cb_size = m->cb_size;
186
      if (m) peerset_get_peer(pset, id)->capacity = m->capacity;
187
      /* add measures here */
188
      add_measures(id);
189
      send_bmap(id);
190
  }
191
  if (remote) {
192
      dtprintf("Topo: explicit topo message sent!!! to %s (peers:%d)\n", node_addr(id));
193
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_ADD);
194
  }
195
}
196

    
197
static void remove_peer(const struct nodeID *id, bool local, bool remote)
198
{
199
  if (local) {
200
      dprintf("Removing %s from neighbourhood!\n", node_addr(id));
201
      /* add measures here */
202
      delete_measures(id);
203
      peerset_remove_peer(pset, id);
204
  }
205
  if (remote) {
206
      dtprintf("Topo: explicit topo message sent!!! to %s (peers:%d)\n", node_addr(id));
207
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_REMOVE);
208
  }
209
}
210

    
211
//get the rtt. Currenly only MONL version is supported
212
static double get_rtt_of(const struct nodeID* n){
213
#ifdef MONL
214
  return get_rtt(n);
215
#else
216
  return NAN;
217
#endif
218
}
219

    
220
//get the declared capacity of a node
221
static double get_capacity_of(const struct nodeID* n){
222
  struct peer *p = peerset_get_peer(pset, n);
223
  if (p) {
224
    return p->capacity;
225
  }
226

    
227
  return NAN;
228
}
229

    
230
//returns: 1:yes 0:no -1:unknown
231
int desiredness(const struct nodeID* n) {
232
  double rtt = get_rtt_of(n);
233
  double bw =  get_capacity_of(n);
234

    
235
  if ((isnan(rtt) && finite(desired_rtt)) || (isnan(bw) && desired_bw > 0)) {
236
    return -1;
237
  } else if ((isnan(rtt) || rtt <= desired_rtt) && (isnan(bw) || bw >= desired_bw)) {
238
    return 1;
239
  }
240

    
241
  return 0;
242
}
243

    
244
bool is_desired(const struct nodeID* n) {
245
  return (desiredness(n) == 1);
246
}
247

    
248
// currently it just makes the peerset grow
249
void update_peers(struct nodeID *from, const uint8_t *buff, int len)
250
{
251
  int n_ids, metasize, i, newids_size, max_ids;
252
  static const struct nodeID **newids;
253
  static const struct metadata *metas;
254
  struct peer *peers;
255
  struct timeval tnow, told;
256

    
257
  if timerisset(&tout_bmap) {
258
    gettimeofday(&tnow, NULL);
259
    timersub(&tnow, &tout_bmap, &told);
260
    peers = peerset_get_peers(pset);
261
    for (i = 0; i < peerset_size(pset); i++) {
262
      if ( (!timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].creation_timestamp, &told, <) ) ||
263
           ( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <)     )   ) {
264
        ftprintf(stderr,"Topo: dropping inactive %s (peers:%d)\n", node_addr(peers[i].id), peerset_size(pset));
265
        //if (peerset_size(pset) > 1) {        // avoid dropping our last link to the world
266
        topoAddToBL(peers[i].id);
267
        remove_peer(peers[i--].id, true, true);
268
        //}
269
      }
270
    }
271
  }
272

    
273
  if (cnt++ % 100 == 0) {
274
        update_metadata();
275
    if (counter > TMAN_MAX_IDLE) {
276
        tmanChangeMetadata(&my_metadata,sizeof(my_metadata));
277
    }
278
  }
279

    
280
  //handle explicit add/remove messages
281
  if (topo_in && buff && buff[0] == MSG_TYPE_STREAMER_TOPOLOGY) {
282
    dtprintf(stderr,"Topo: explicit topo message received!!!from %s (peers:%d)\n", node_addr(from), peerset_size(pset));
283
    if (len != 2) {
284
      fprintf(stderr, "Bad streamer topo message received, len:%d!\n", len);
285
      return;
286
    }
287
    switch (buff[1]) {
288
      case STREAMER_TOPOLOGY_MSG_ADD:
289
        ftprintf(stderr,"Topo: adding for symmetry %s (peers:%d)\n", node_addr(from), peerset_size(pset));
290
        add_peer(from, NULL, true, false);
291
        break;
292
      case STREAMER_TOPOLOGY_MSG_REMOVE:
293
        ftprintf(stderr,"Topo: removing for symmetry %s (peers:%d)\n", node_addr(from), peerset_size(pset));
294
        remove_peer(from, true, false);
295
        break;
296
      default:
297
        fprintf(stderr, "Bad streamer topo message received!\n");
298
    }
299
    reg_neigh_size(peerset_size(pset));
300
    return;
301
  }
302

    
303
  topoParseData(buff, len);
304

    
305
  if (!buff) {
306
    reg_neigh_size(peerset_size(pset));
307
    return;
308
  }
309

    
310
  peers = peerset_get_peers(pset);
311
  n_ids = peerset_size(pset);
312
  newids = topoGetNeighbourhood(&newids_size);        //TODO handle both tman and topo
313
  metas = topGetMetadata(&metasize);        //TODO: check metasize
314
  max_ids = n_ids + newids_size;
315
  ftprintf(stderr,"Topo modify start peers:%d candidates:%d\n", n_ids, newids_size);
316
  {
317
    int desired_part;
318
    const struct nodeID *oldids[max_ids], *nodeids[max_ids], *candidates[max_ids], *desireds[max_ids], *selecteds[max_ids], *others[max_ids], *toadds[max_ids], *toremoves[max_ids];
319
    size_t oldids_size, nodeids_size, candidates_size, desireds_size, selecteds_size, others_size, toadds_size, toremoves_size, keep_size, random_size;
320
    nodeids_size = candidates_size = desireds_size = selecteds_size = others_size = toadds_size = toremoves_size = max_ids;
321

    
322
    for (i = 0, oldids_size = 0; i < peerset_size(pset); i++) {
323
      oldids[oldids_size++] = peers[i].id;
324
      fprintf(stderr," %s - RTT: %f\n", node_addr(peers[i].id) , get_rtt_of(peers[i].id));
325
    }
326

    
327
    // select the topo_mem portion of peers to be kept (uniform random)
328
    nidset_shuffle(oldids, oldids_size);
329
    keep_size = selecteds_size = (int) (topo_mem * oldids_size);
330
    memcpy(selecteds, oldids, selecteds_size * sizeof(selecteds[0]));
331

    
332
    // compose list of known nodeids
333
    nidset_add(nodeids, &nodeids_size, oldids, oldids_size, newids, newids_size);
334

    
335
    // compose list of candidate nodeids
336
    nidset_complement(candidates, &candidates_size, nodeids, nodeids_size, selecteds, selecteds_size);
337

    
338
    // select the alpha_target portion of desired peers
339
    desired_part = (1 - alpha_target) * (NEIGHBORHOOD_TARGET_SIZE ? (MAX (NEIGHBORHOOD_TARGET_SIZE - selecteds_size, 0)) : 0);
340
    nidset_filter(desireds, &desireds_size, candidates, candidates_size, is_desired);
341
    nidset_shuffle(desireds, desireds_size);
342
    nidset_add_i(selecteds, &selecteds_size, max_ids, desireds, MIN(desireds_size,desired_part));
343

    
344
    // random from the rest
345
    nidset_complement(others, &others_size, candidates, candidates_size, selecteds, selecteds_size);
346
    nidset_shuffle(others, others_size);
347
    random_size = NEIGHBORHOOD_TARGET_SIZE ? MIN(others_size, NEIGHBORHOOD_TARGET_SIZE - selecteds_size) : others_size;
348
    nidset_add_i(selecteds, &selecteds_size, max_ids, others, random_size);
349

    
350
    fprintf(stderr,"Topo modify sel:%ld (from:%ld) = keep: %ld (of old:%ld) + desired: %ld (from %ld of %ld; target:%d) + random: %ld (from %ld)\n",
351
            (long)selecteds_size, (long)nodeids_size,
352
            (long)keep_size, (long)oldids_size,
353
            (long)MIN(desireds_size,desired_part), (long)desireds_size, (long)candidates_size, desired_part,
354
            (long)random_size, (long)others_size);
355
    // add new ones
356
    nidset_complement(toadds, &toadds_size, selecteds, selecteds_size, oldids, oldids_size);
357
    for (i = 0; i < toadds_size; i++) {
358
      size_t j;
359
      //searching for the metadata
360
      if (nidset_find(&j, newids, newids_size, toadds[i])) {
361
        fprintf(stderr," adding %s\n", node_addr(toadds[i]));
362
        add_peer(newids[j], &metas[j], topo_out, topo_in);
363
      } else {
364
        fprintf(stderr," Error: missing metadata for %s\n", node_addr(toadds[i]));
365
      }
366
    }
367

    
368
    // finally, remove those not needed
369
    fprintf(stderr,"Topo remove start (peers:%d)\n", n_ids);
370
    nidset_complement(toremoves, &toremoves_size, oldids, oldids_size, selecteds, selecteds_size);
371
    for (i = 0; i < toremoves_size; i++) {
372
      fprintf(stderr," removing %s\n", node_addr(toremoves[i]));
373
      remove_peer(toremoves[i], topo_out, topo_in);
374
    }
375
    fprintf(stderr,"Topo remove end\n");
376
  }
377

    
378
  reg_neigh_size(peerset_size(pset));
379
}
380

    
381
struct peer *nodeid_to_peer(const struct nodeID* id, int reg)
382
{
383
  struct peer *p = peerset_get_peer(pset, id);
384
  if (!p) {
385
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
386
    if (reg) {
387
      add_peer(id,NULL, topo_out, false);
388
      fprintf(stderr,"Topo: ext adding %s (peers:%d)\n", node_addr(id), peerset_size(pset));
389
      p = peerset_get_peer(pset,id);
390
    }
391
  }
392

    
393
  return p;
394
}
395

    
396
int peers_init(void)
397
{
398
  fprintf(stderr,"peers_init\n");
399
  pset = peerset_init(0);
400
  return pset ? 1 : 0;
401
}
402

    
403
struct peerset *get_peers(void)
404
{
405
  return pset;
406
}