Statistics
| Branch: | Revision:

streamers / topology.c @ 9176d3d1

History | View | Annotate | Download (12.6 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *  Copyright (c) 2010 Luca Abeni
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <stdint.h>
8
#include <stdio.h>
9
#include <sys/time.h>
10
#include <time.h>
11
#include <stdlib.h>
12
#include <string.h>
13

    
14
#include <math.h>
15
#include <net_helper.h>
16
#include <peerset.h>
17
#include <peer.h>
18
#include <grapes_msg_types.h>
19
#include <topmanager.h>
20
#include <tman.h>
21

    
22
#include "compatibility/timer.h"
23

    
24
#include "topology.h"
25
#include "nodeid_set.h"
26
#include "streaming.h"
27
#include "dbg.h"
28
#include "measures.h"
29
#include "streamer.h"
30

    
31
#define MIN(A,B) (((A) < (B)) ? (A) : (B))
32
#define MAX(A,B) (((A) > (B)) ? (A) : (B))
33

    
34
double desired_bw = 0;
35
double desired_rtt = 0.2;
36
double alpha_target = 0.5;
37
double topo_mem = 0;
38

    
39
int NEIGHBORHOOD_TARGET_SIZE = 30;
40
double NEIGHBORHOOD_ROTATE_RATIO = 1.0;
41
#define TMAN_MAX_IDLE 10
42
#define TMAN_LOG_EVERY 1000
43

    
44
#define STREAMER_TOPOLOGY_MSG_ADD 0
45
#define STREAMER_TOPOLOGY_MSG_REMOVE 1
46

    
47
static struct peerset *pset;
48
static struct timeval tout_bmap = {20, 0};
49
static int counter = 0;
50
static int simpleRanker (const void *tin, const void *p1in, const void *p2in);
51
static tmanRankingFunction rankFunct = simpleRanker;
52

    
53
struct metadata {
54
  uint16_t cb_size;
55
  uint16_t cps;
56
  float capacity;
57
  float recv_delay;
58
} __attribute__((packed));
59

    
60
static struct metadata my_metadata;
61
static int cnt = 0;
62
static struct nodeID *me = NULL;
63
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY,MSG_TYPE_TMAN,MSG_TYPE_STREAMER_TOPOLOGY};
64
static struct nodeID ** neighbors;
65

    
66
static void update_metadata(void) {
67
        my_metadata.cb_size = am_i_source() ? 0 : get_cb_size();
68
        my_metadata.recv_delay = get_receive_delay();
69
        my_metadata.cps = get_chunks_per_sec();
70
        my_metadata.capacity = get_capacity();
71
}
72

    
73
static int simpleRanker (const void *tin, const void *p1in, const void *p2in) {
74

    
75
        double t,p1,p2;
76
        t = *((const double *)tin);
77
        p1 = *((const double *)p1in);
78
        p2 = *((const double *)p2in);
79

    
80
        if (isnan(t) || (isnan(p1) && isnan(p2))) return 0;
81
        else if (isnan(p1)) return 2;
82
        else if (isnan(p2)) return 1;
83
        else return (fabs(t-p1) == fabs(t-p2))?0:(fabs(t-p1) < fabs(t-p2))?1:2;
84

    
85
}
86

    
87
int topologyInit(struct nodeID *myID, const char *config)
88
{
89
        int i;
90
        for (i=0;i<sizeof(mTypes)/sizeof(mTypes[0]);i++)
91
                bind_msg_type(mTypes[i]);
92
        update_metadata();
93
        me = myID;
94
        return (topInit(myID, &my_metadata, sizeof(my_metadata), config) && tmanInit(myID,&my_metadata, sizeof(my_metadata),rankFunct,0));
95
}
96

    
97
void topologyShutdown(void)
98
{
99
}
100

    
101
int topoAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
102
{
103
        // TODO: check this!! Just to use this function to bootstrap ncast...
104
        struct metadata m = {0};        //TODO: check what metadata option should mean
105

    
106
        if (counter < TMAN_MAX_IDLE)
107
                return topAddNeighbour(neighbour,&m,sizeof(m));
108
        else return tmanAddNeighbour(neighbour,&m,sizeof(m));
109
}
110

    
111
static int topoParseData(const uint8_t *buff, int len)
112
{
113
        int res = -1,ncs = 0,msize;
114
        const struct nodeID **n; const void *m;
115
        if (!buff || buff[0] == MSG_TYPE_TOPOLOGY) {
116
                res = topParseData(buff,len);
117
//                if (counter <= TMAN_MAX_IDLE)
118
//                        counter++;
119
        }
120
        if (counter >= TMAN_MAX_IDLE && (!buff || buff[0] == MSG_TYPE_TMAN))
121
        {
122
                n = topGetNeighbourhood(&ncs);
123
                if (ncs) {
124
                m = topGetMetadata(&msize);
125
                res = tmanParseData(buff,len,n,ncs,m,msize);
126
                }
127
        }
128
  return res;
129
}
130

    
131
static const struct nodeID **topoGetNeighbourhood(int *n)
132
{
133
        int i; double d;
134
        if (counter > TMAN_MAX_IDLE) {
135
                uint8_t *mdata; int msize;
136
                *n = tmanGetNeighbourhoodSize();
137
                if (neighbors) free(neighbors);
138
                neighbors = calloc(*n,sizeof(struct nodeID *));
139
                tmanGetMetadata(&msize);
140
                mdata = calloc(*n,msize);
141
                tmanGivePeers(*n,neighbors,(void *)mdata);
142

    
143
                if (cnt % TMAN_LOG_EVERY == 0) {
144
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_chunk_delay,%f\n",node_addr(me),node_addr(me),my_metadata.recv_delay);
145
                        for (i=0;i<(*n) && i<NEIGHBORHOOD_TARGET_SIZE;i++) {
146
                                d = *((double *)(mdata+i*msize));
147
                                fprintf(stderr,"abouttopublish,%s,",node_addr(me));
148
                                fprintf(stderr,"%s,,Tman_chunk_delay,%f\n",node_addr(neighbors[i]),d);
149
                        }
150
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_neighborhood_size,%d\n\n",node_addr(me),node_addr(me),*n);
151
                }
152

    
153
                free(mdata);
154
                return (const struct nodeID **)neighbors;
155
        }
156
        else
157
                return topGetNeighbourhood(n);
158
}
159

    
160
static void topoAddToBL (struct nodeID *id)
161
{
162
        if (counter >= TMAN_MAX_IDLE)
163
                tmanAddToBlackList(id);
164
//        else
165
                topAddToBlackList(id);
166
}
167

    
168
//TODO: send metadata as well
169
static int send_topo_msg(struct nodeID *dst, uint8_t type)
170
{
171
  char msg[2];
172
  msg[0] = MSG_TYPE_STREAMER_TOPOLOGY;
173
  msg[1] = type;
174
  return send_to_peer(me, dst, msg, 2);
175
}
176

    
177
static void add_peer_silent(const struct nodeID *id, const struct metadata *m)
178
{
179
      dprintf("Adding %s to neighbourhood! cb_size:%d\n", node_addr(id), m?m->cb_size:-1);
180
      peerset_add_peer(pset, id);
181
      if (m) peerset_get_peer(pset, id)->cb_size = m->cb_size;
182
      if (m) peerset_get_peer(pset, id)->capacity = m->capacity;
183
      /* add measures here */
184
      add_measures(id);
185
      send_bmap(id);
186
}
187

    
188
static void add_peer(const struct nodeID *id, const struct metadata *m)
189
{
190
      add_peer_silent(id, m);
191
      dtprintf("Topo: explicit topo message sent!!! to %s (peers:%d)\n", node_addr(id));
192
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_ADD);
193
}
194

    
195
static void remove_peer_silent(const struct nodeID *id)
196
{
197
      dprintf("Removing %s from neighbourhood!\n", node_addr(id));
198
      /* add measures here */
199
      delete_measures(id);
200
      peerset_remove_peer(pset, id);
201
}
202

    
203
static void remove_peer(const struct nodeID *id)
204
{
205
      remove_peer_silent(id);
206
      dtprintf("Topo: explicit topo message sent!!! to %s (peers:%d)\n", node_addr(id));
207
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_REMOVE);
208
}
209

    
210
//get the rtt. Currenly only MONL version is supported
211
static double get_rtt_of(const struct nodeID* n){
212
#ifdef MONL
213
  return get_rtt(n);
214
#else
215
  return NAN;
216
#endif
217
}
218

    
219
//get the declared capacity of a node
220
static double get_capacity_of(const struct nodeID* n){
221
  struct peer *p = peerset_get_peer(pset, n);
222
  if (p) {
223
    return p->capacity;
224
  }
225

    
226
  return NAN;
227
}
228

    
229
//returns: 1:yes 0:no -1:unknown
230
int desiredness(const struct nodeID* n) {
231
  double rtt = get_rtt_of(n);
232
  double bw =  get_capacity_of(n);
233

    
234
  if ((isnan(rtt) && finite(desired_rtt)) || (isnan(bw) && desired_bw > 0)) {
235
    return -1;
236
  } else if ((isnan(rtt) || rtt <= desired_rtt) && (isnan(bw) || bw >= desired_bw)) {
237
    return 1;
238
  }
239

    
240
  return 0;
241
}
242

    
243
bool is_desired(const struct nodeID* n) {
244
  return (desiredness(n) == 1);
245
}
246

    
247
// currently it just makes the peerset grow
248
void update_peers(struct nodeID *from, const uint8_t *buff, int len)
249
{
250
  int n_ids, metasize, i, newids_size, max_ids;
251
  static const struct nodeID **newids;
252
  static const struct metadata *metas;
253
  struct peer *peers;
254
  struct timeval tnow, told;
255

    
256
  if timerisset(&tout_bmap) {
257
    gettimeofday(&tnow, NULL);
258
    timersub(&tnow, &tout_bmap, &told);
259
    peers = peerset_get_peers(pset);
260
    for (i = 0; i < peerset_size(pset); i++) {
261
      if ( (!timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].creation_timestamp, &told, <) ) ||
262
           ( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <)     )   ) {
263
        ftprintf(stderr,"Topo: dropping inactive %s (peers:%d)\n", node_addr(peers[i].id), peerset_size(pset));
264
        //if (peerset_size(pset) > 1) {        // avoid dropping our last link to the world
265
        topoAddToBL(peers[i].id);
266
        remove_peer(peers[i--].id);
267
        //}
268
      }
269
    }
270
  }
271

    
272
  if (cnt++ % 100 == 0) {
273
        update_metadata();
274
    if (counter > TMAN_MAX_IDLE) {
275
        tmanChangeMetadata(&my_metadata,sizeof(my_metadata));
276
    }
277
  }
278

    
279
  //handle explicit add/remove messages
280
  if (buff && buff[0] == MSG_TYPE_STREAMER_TOPOLOGY) {
281
    dtprintf(stderr,"Topo: explicit topo message received!!!from %s (peers:%d)\n", node_addr(from), peerset_size(pset));
282
    if (len != 2) {
283
      fprintf(stderr, "Bad streamer topo message received, len:%d!\n", len);
284
      return;
285
    }
286
    switch (buff[1]) {
287
      case STREAMER_TOPOLOGY_MSG_ADD:
288
        ftprintf(stderr,"Topo: adding for symmetry %s (peers:%d)\n", node_addr(from), peerset_size(pset));
289
        add_peer_silent(from, NULL);
290
        break;
291
      case STREAMER_TOPOLOGY_MSG_REMOVE:
292
        ftprintf(stderr,"Topo: removing for symmetry %s (peers:%d)\n", node_addr(from), peerset_size(pset));
293
        remove_peer_silent(from);
294
        break;
295
      default:
296
        fprintf(stderr, "Bad streamer topo message received!\n");
297
    }
298
    reg_neigh_size(peerset_size(pset));
299
    return;
300
  }
301

    
302
  topoParseData(buff, len);
303

    
304
  if (!buff) {
305
    reg_neigh_size(peerset_size(pset));
306
    return;
307
  }
308

    
309
  peers = peerset_get_peers(pset);
310
  n_ids = peerset_size(pset);
311
  newids = topoGetNeighbourhood(&newids_size);        //TODO handle both tman and topo
312
  metas = topGetMetadata(&metasize);        //TODO: check metasize
313
  max_ids = n_ids + newids_size;
314
  ftprintf(stderr,"Topo modify start peers:%d candidates:%d\n", n_ids, newids_size);
315
  {
316
    int desired_part;
317
    const struct nodeID *oldids[max_ids], *nodeids[max_ids], *candidates[max_ids], *desireds[max_ids], *selecteds[max_ids], *others[max_ids], *toadds[max_ids], *toremoves[max_ids];
318
    size_t oldids_size, nodeids_size, candidates_size, desireds_size, selecteds_size, others_size, toadds_size, toremoves_size, keep_size, random_size;
319
    nodeids_size = candidates_size = desireds_size = selecteds_size = others_size = toadds_size = toremoves_size = max_ids;
320

    
321
    for (i = 0, oldids_size = 0; i < peerset_size(pset); i++) {
322
      oldids[oldids_size++] = peers[i].id;
323
      fprintf(stderr," %s - RTT: %f\n", node_addr(peers[i].id) , get_rtt_of(peers[i].id));
324
    }
325

    
326
    // select the topo_mem portion of peers to be kept (uniform random)
327
    nidset_shuffle(oldids, oldids_size);
328
    keep_size = selecteds_size = (int) (topo_mem * oldids_size);
329
    memcpy(selecteds, oldids, selecteds_size * sizeof(selecteds[0]));
330

    
331
    // compose list of known nodeids
332
    nidset_add(nodeids, &nodeids_size, oldids, oldids_size, newids, newids_size);
333

    
334
    // compose list of candidate nodeids
335
    nidset_complement(candidates, &candidates_size, nodeids, nodeids_size, selecteds, selecteds_size);
336

    
337
    // select the alpha_target portion of desired peers
338
    desired_part = (1 - alpha_target) * (NEIGHBORHOOD_TARGET_SIZE ? (MAX (NEIGHBORHOOD_TARGET_SIZE - selecteds_size, 0)) : 0);
339
    nidset_filter(desireds, &desireds_size, candidates, candidates_size, is_desired);
340
    nidset_shuffle(desireds, desireds_size);
341
    nidset_add_i(selecteds, &selecteds_size, max_ids, desireds, MIN(desireds_size,desired_part));
342

    
343
    // random from the rest
344
    nidset_complement(others, &others_size, candidates, candidates_size, selecteds, selecteds_size);
345
    nidset_shuffle(others, others_size);
346
    random_size = NEIGHBORHOOD_TARGET_SIZE ? MIN(others_size, NEIGHBORHOOD_TARGET_SIZE - selecteds_size) : others_size;
347
    nidset_add_i(selecteds, &selecteds_size, max_ids, others, random_size);
348

    
349
    fprintf(stderr,"Topo modify sel:%ld (from:%ld) = keep: %ld (of old:%ld) + desired: %ld (from %ld of %ld; target:%d) + random: %ld (from %ld)\n",
350
            (long)selecteds_size, (long)nodeids_size,
351
            (long)keep_size, (long)oldids_size,
352
            (long)MIN(desireds_size,desired_part), (long)desireds_size, (long)candidates_size, desired_part,
353
            (long)random_size, (long)others_size);
354
    // add new ones
355
    nidset_complement(toadds, &toadds_size, selecteds, selecteds_size, oldids, oldids_size);
356
    for (i = 0; i < toadds_size; i++) {
357
      size_t j;
358
      //searching for the metadata
359
      if (nidset_find(&j, newids, newids_size, toadds[i])) {
360
        fprintf(stderr," adding %s\n", node_addr(toadds[i]));
361
        add_peer(newids[j], &metas[j]);
362
      } else {
363
        fprintf(stderr," Error: missing metadata for %s\n", node_addr(toadds[i]));
364
      }
365
    }
366

    
367
    // finally, remove those not needed
368
    fprintf(stderr,"Topo remove start (peers:%d)\n", n_ids);
369
    nidset_complement(toremoves, &toremoves_size, oldids, oldids_size, selecteds, selecteds_size);
370
    for (i = 0; i < toremoves_size; i++) {
371
      fprintf(stderr," removing %s\n", node_addr(toremoves[i]));
372
      remove_peer(toremoves[i]);
373
    }
374
    fprintf(stderr,"Topo remove end\n");
375
  }
376

    
377
  reg_neigh_size(peerset_size(pset));
378
}
379

    
380
struct peer *nodeid_to_peer(const struct nodeID* id, int reg)
381
{
382
  struct peer *p = peerset_get_peer(pset, id);
383
  if (!p) {
384
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
385
    if (reg) {
386
      add_peer(id,NULL);
387
      fprintf(stderr,"Topo: ext adding %s (peers:%d)\n", node_addr(id), peerset_size(pset));
388
      p = peerset_get_peer(pset,id);
389
    }
390
  }
391

    
392
  return p;
393
}
394

    
395
int peers_init(void)
396
{
397
  fprintf(stderr,"peers_init\n");
398
  pset = peerset_init(0);
399
  return pset ? 1 : 0;
400
}
401

    
402
struct peerset *get_peers(void)
403
{
404
  return pset;
405
}