Statistics
| Branch: | Revision:

streamers / topology.c @ c7ebbd34

History | View | Annotate | Download (11.5 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *  Copyright (c) 2010 Luca Abeni
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <stdint.h>
8
#include <stdio.h>
9
#include <sys/time.h>
10
#include <time.h>
11
#include <stdlib.h>
12
#include <string.h>
13

    
14
#include <math.h>
15
#include <net_helper.h>
16
#include <peerset.h>
17
#include <peer.h>
18
#include <grapes_msg_types.h>
19
#include <topmanager.h>
20
#include <tman.h>
21

    
22
#include "compatibility/timer.h"
23

    
24
#include "topology.h"
25
#include "streaming.h"
26
#include "dbg.h"
27
#include "measures.h"
28
#include "streamer.h"
29

    
30
#define MIN(A,B) ((A) < (B)) ? (A) : (B)
31

    
32
double desired_rtt = 0.2;
33
double alpha_target = 0.5;
34

    
35
int NEIGHBORHOOD_TARGET_SIZE = 20;
36
double NEIGHBORHOOD_ROTATE_RATIO = 1.0;
37
#define TMAN_MAX_IDLE 10
38
#define TMAN_LOG_EVERY 1000
39

    
40
static struct peerset *pset;
41
static struct timeval tout_bmap = {10, 0};
42
static int counter = 0;
43
static int simpleRanker (const void *tin, const void *p1in, const void *p2in);
44
static tmanRankingFunction rankFunct = simpleRanker;
45
struct metadata {
46
  uint16_t cb_size;
47
  uint16_t cps;
48
  float recv_delay;
49
};
50
static struct metadata my_metadata;
51
static int cnt = 0;
52
static struct nodeID *me = NULL;
53
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY,MSG_TYPE_TMAN};
54
static struct nodeID ** neighbors;
55

    
56
static void update_metadata(void) {
57

    
58
        my_metadata.cb_size = am_i_source() ? 0 : get_cb_size();
59
        my_metadata.recv_delay = get_receive_delay();
60
        my_metadata.cps = get_chunks_per_sec();
61
}
62

    
63
static int simpleRanker (const void *tin, const void *p1in, const void *p2in) {
64

    
65
        double t,p1,p2;
66
        t = *((const double *)tin);
67
        p1 = *((const double *)p1in);
68
        p2 = *((const double *)p2in);
69

    
70
        if (isnan(t) || (isnan(p1) && isnan(p2))) return 0;
71
        else if (isnan(p1)) return 2;
72
        else if (isnan(p2)) return 1;
73
        else return (fabs(t-p1) == fabs(t-p2))?0:(fabs(t-p1) < fabs(t-p2))?1:2;
74

    
75
}
76

    
77
int topologyInit(struct nodeID *myID, const char *config)
78
{
79
        int i;
80
        for (i=0;i<2;i++)
81
                bind_msg_type(mTypes[i]);
82
        update_metadata();
83
        me = myID;
84
        return (topInit(myID, &my_metadata, sizeof(my_metadata), config) && tmanInit(myID,&my_metadata, sizeof(my_metadata),rankFunct,0));
85
}
86

    
87
void topologyShutdown(void)
88
{
89
}
90

    
91
int topoAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
92
{
93
        // TODO: check this!! Just to use this function to bootstrap ncast...
94
        struct metadata m = {0};        //TODO: check what metadata option should mean
95

    
96
        if (counter < TMAN_MAX_IDLE)
97
                return topAddNeighbour(neighbour,&m,sizeof(m));
98
        else return tmanAddNeighbour(neighbour,&m,sizeof(m));
99
}
100

    
101
static int topoParseData(const uint8_t *buff, int len)
102
{
103
        int res = -1,ncs = 0,msize;
104
        const struct nodeID **n; const void *m;
105
        if (!buff || buff[0] == MSG_TYPE_TOPOLOGY) {
106
                res = topParseData(buff,len);
107
//                if (counter <= TMAN_MAX_IDLE)
108
//                        counter++;
109
        }
110
        if (counter >= TMAN_MAX_IDLE && (!buff || buff[0] == MSG_TYPE_TMAN))
111
        {
112
                n = topGetNeighbourhood(&ncs);
113
                if (ncs) {
114
                m = topGetMetadata(&msize);
115
                res = tmanParseData(buff,len,n,ncs,m,msize);
116
                }
117
        }
118
  return res;
119
}
120

    
121
static const struct nodeID **topoGetNeighbourhood(int *n)
122
{
123
        int i; double d;
124
        if (counter > TMAN_MAX_IDLE) {
125
                uint8_t *mdata; int msize;
126
                *n = tmanGetNeighbourhoodSize();
127
                if (neighbors) free(neighbors);
128
                neighbors = calloc(*n,sizeof(struct nodeID *));
129
                tmanGetMetadata(&msize);
130
                mdata = calloc(*n,msize);
131
                tmanGivePeers(*n,neighbors,(void *)mdata);
132

    
133
                if (cnt % TMAN_LOG_EVERY == 0) {
134
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_chunk_delay,%f\n",node_addr(me),node_addr(me),my_metadata.recv_delay);
135
                        for (i=0;i<(*n) && i<NEIGHBORHOOD_TARGET_SIZE;i++) {
136
                                d = *((double *)(mdata+i*msize));
137
                                fprintf(stderr,"abouttopublish,%s,",node_addr(me));
138
                                fprintf(stderr,"%s,,Tman_chunk_delay,%f\n",node_addr(neighbors[i]),d);
139
                        }
140
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_neighborhood_size,%d\n\n",node_addr(me),node_addr(me),*n);
141
                }
142

    
143
                free(mdata);
144
                return (const struct nodeID **)neighbors;
145
        }
146
        else
147
                return topGetNeighbourhood(n);
148
}
149

    
150
static void topoAddToBL (struct nodeID *id)
151
{
152
        if (counter >= TMAN_MAX_IDLE)
153
                tmanAddToBlackList(id);
154
//        else
155
                topAddToBlackList(id);
156
}
157

    
158
void add_peer(const struct nodeID *id, const struct metadata *m)
159
{
160
      dprintf("Adding %s to neighbourhood! cb_size:%d\n", node_addr(id), m?m->cb_size:-1);
161
      peerset_add_peer(pset, id);
162
      if (m) peerset_get_peer(pset, id)->cb_size = m->cb_size;
163
      /* add measures here */
164
      add_measures(id);
165
      send_bmap(id);
166
}
167

    
168
void remove_peer(const struct nodeID *id)
169
{
170
      dprintf("Removing %s from neighbourhood!\n", node_addr(id));
171
      /* add measures here */
172
      delete_measures(id);
173
      peerset_remove_peer(pset, id);
174
}
175

    
176
//get the rtt. Currenly only MONL version is supported
177
static double get_rtt_of(const struct nodeID* n){
178
#ifdef MONL
179
  return get_rtt(n);
180
#else
181
  return NAN;
182
#endif
183
}
184

    
185
//returns: 1:yes 0:no -1:unknown
186
int desiredness(const struct nodeID* n) {
187
  double rtt = get_rtt_of(n);
188

    
189
  return isnan(rtt) ? -1 : ((rtt <= desired_rtt) ? 1 : 0);
190
}
191

    
192
bool is_desired(const struct nodeID* n) {
193
  return (desiredness(n) == 1);
194
}
195

    
196
// The usual shuffle
197
static void shuffle(void *base, size_t nmemb, size_t size) {
198
  int i;
199
  unsigned char t[size];
200
  unsigned char* b = base;
201

    
202
  for (i = nmemb - 1; i > 0; i--) {
203
    int newpos = (rand()/(RAND_MAX + 1.0)) * (i + 1);
204
    memcpy(t, b + size * newpos, size);
205
    memmove(b + size * newpos, b + size * i, size);
206
    memcpy(b + size * i, t, size);
207
  }
208
}
209

    
210
static void nidset_shuffle(const struct nodeID **base, size_t nmemb) {
211
  shuffle(base, nmemb, sizeof(struct nodeID *));
212
}
213

    
214
static int nidset_filter(const struct nodeID **dst, size_t *dst_size, const struct nodeID **src, size_t src_size, bool(*f)(const struct nodeID *)) {
215
  size_t i;
216
  size_t max_size = *dst_size;
217
  *dst_size = 0;
218

    
219
  for (i = 0; i < src_size; i++) {
220
    if (f(src[i])) {
221
      if (*dst_size < max_size) {
222
        dst[(*dst_size)++] = src[i];
223
      } else {
224
        return -1;
225
      }
226
    }
227
  }
228

    
229
  return 0;
230
}
231

    
232
// B \ A
233
static int nidset_complement(const struct nodeID **dst, size_t *dst_size, const struct nodeID **bs, size_t bs_size, const struct nodeID **as, size_t as_size) {
234
  size_t i, j;
235
  size_t max_size = *dst_size;
236
  *dst_size = 0;
237

    
238
  for (i = 0; i < bs_size; i++) {
239
    for (j = 0; j < as_size; j++) {
240
      if (bs[i] == as[j]) {
241
        break;
242
      }
243
    }
244
    if (j >= as_size) {
245
      if (*dst_size < max_size) {
246
        dst[(*dst_size)++] = bs[i];
247
      } else {
248
        return -1;
249
      }
250
    }
251
  }
252

    
253
  return 0;
254
}
255

    
256
static bool nidset_find(size_t *i, const struct nodeID **ids, size_t ids_size, const struct nodeID *id) {
257
  for (*i = 0; *i < ids_size; (*i)++) {
258
    if (ids[*i] == id) {
259
      return true;
260
    }
261
  }
262
  return false;
263
}
264

    
265
static int nidset_add(const struct nodeID **dst, size_t *dst_size, const struct nodeID **as, size_t as_size, const struct nodeID **bs, size_t bs_size) {
266
  size_t i;
267
  size_t max_size = *dst_size;
268

    
269
  i = MIN(as_size, max_size);
270
  memcpy(dst, as, i * sizeof(struct nodeID*));
271
  *dst_size = i;
272
  if (i < as_size) return -1;
273

    
274
  i = MIN(bs_size, max_size - *dst_size);
275
  memcpy(dst + *dst_size , bs, i * sizeof(struct nodeID*));
276
  *dst_size += i;
277
  if (i < bs_size) return -1;
278

    
279
  return 0;
280
}
281

    
282
static int nidset_add_i(const struct nodeID **dst, size_t *dst_size, size_t max_size, const struct nodeID **as, size_t as_size) {
283
  size_t i;
284

    
285
  i = MIN(as_size, max_size - *dst_size);
286
  memcpy(dst + *dst_size , as, i * sizeof(struct nodeID*));
287
  *dst_size += i;
288
  if (i < as_size) return -1;
289

    
290
  return 0;
291
}
292

    
293
// currently it just makes the peerset grow
294
void update_peers(struct nodeID *from, const uint8_t *buff, int len)
295
{
296
  int n_ids, metasize, i;
297
  static const struct nodeID **ids;
298
  static const struct metadata *metas;
299
  struct peer *peers;
300
  struct timeval tnow, told;
301

    
302
//  dprintf("Update peers: topo_msg:%d, ",len);
303
//  if (from) {
304
//    dprintf("from:%s, ",node_addr(from));
305
//    if (peerset_check(pset, from) < 0) {
306
//      topAddNeighbour(from, NULL, 0);        //@TODO: this is agressive
307
//      if (!NEIGHBORHOOD_TARGET_SIZE || peerset_size(pset) < NEIGHBORHOOD_TARGET_SIZE) {
308
//        add_peer(from);
309
//      }
310
//    }
311
//  }
312

    
313
  if (cnt++ % 100 == 0) {
314
        update_metadata();
315
    if (counter > TMAN_MAX_IDLE) {
316
        tmanChangeMetadata(&my_metadata,sizeof(my_metadata));
317
    }
318
  }
319

    
320
  topoParseData(buff, len);
321

    
322
  if (!buff) {
323
    reg_neigh_size(peerset_size(pset));
324
    return;
325
  }
326

    
327
  fprintf(stderr,"Topo modify start\n");
328
  peers = peerset_get_peers(pset);
329
  for (i = 0; i < peerset_size(pset); i++) {
330
    fprintf(stderr," %s - RTT: %f\n", node_addr(peers[i].id) , get_rtt_of(peers[i].id));
331
  }
332

    
333
  ids = topoGetNeighbourhood(&n_ids);        //TODO handle both tman and topo
334
  metas = topGetMetadata(&metasize);        //TODO: check metasize
335
  for(i = 0; i < n_ids; i++) {
336
    if(peerset_check(pset, ids[i]) < 0) {
337
      if (!NEIGHBORHOOD_TARGET_SIZE || peerset_size(pset) < NEIGHBORHOOD_TARGET_SIZE) {
338
        add_peer(ids[i],&metas[i]);
339
      } else {  //rotate neighbourhood
340
        if (rand()/((double)RAND_MAX + 1) < NEIGHBORHOOD_ROTATE_RATIO) {
341
          add_peer(ids[i],&metas[i]);
342
        }
343
      }
344
    }
345
  }
346

    
347
  if timerisset(&tout_bmap) {
348
    gettimeofday(&tnow, NULL);
349
    timersub(&tnow, &tout_bmap, &told);
350
    peers = peerset_get_peers(pset);
351
    for (i = 0; i < peerset_size(pset); i++) {
352
      if ( (!timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].creation_timestamp, &told, <) ) ||
353
           ( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <)     )   ) {
354
        //if (peerset_size(pset) > 1) {        // avoid dropping our last link to the world
355
        topoAddToBL(peers[i].id);
356
        remove_peer(peers[i--].id);
357
        //}
358
      }
359
    }
360
  }
361

    
362

    
363
  n_ids = peerset_size(pset);
364
  {
365
    int desired_part;
366
    const struct nodeID *nodeids[n_ids], *desireds[n_ids], *selecteds[n_ids], *others[n_ids], *toremoves[n_ids];
367
    size_t nodeids_size, desireds_size, selecteds_size, others_size, toremoves_size;
368
    nodeids_size = desireds_size = selecteds_size = others_size = toremoves_size = n_ids;
369

    
370
    //compose list of nodeids
371
    peers = peerset_get_peers(pset);
372
    for (i = 0; i < n_ids; i++) {
373
      nodeids[i] = peers[i].id;
374
    }
375

    
376
    // select the alpha_target portion of desired peers
377
    desired_part = alpha_target * NEIGHBORHOOD_TARGET_SIZE;
378
    nidset_filter(desireds, &desireds_size, nodeids, nodeids_size, is_desired);
379
    nidset_shuffle(desireds, desireds_size);
380
    selecteds_size = MIN(desireds_size,desired_part);
381
    memcpy(selecteds, desireds, selecteds_size * sizeof(selecteds[0]));
382

    
383
    // random from the rest
384
    nidset_complement(others, &others_size, nodeids, nodeids_size, selecteds, selecteds_size);
385
    nidset_shuffle(others, others_size);
386
    nidset_add_i(selecteds, &selecteds_size, n_ids, others, NEIGHBORHOOD_TARGET_SIZE ? MIN(others_size, NEIGHBORHOOD_TARGET_SIZE - selecteds_size) : others_size);
387

    
388
    // finally, remove those not needed
389
    fprintf(stderr,"Topo remove start (peers:%d)\n", n_ids);
390
    nidset_complement(toremoves, &toremoves_size, nodeids, nodeids_size, selecteds, selecteds_size);
391
    for (i = 0; i < toremoves_size; i++) {
392
      fprintf(stderr," removing %s\n", node_addr(toremoves[i]));
393
      remove_peer(toremoves[i]);
394
    }
395
    fprintf(stderr,"Topo remove end\n");
396
  }
397

    
398
  reg_neigh_size(peerset_size(pset));
399
}
400

    
401
struct peer *nodeid_to_peer(const struct nodeID* id, int reg)
402
{
403
  struct peer *p = peerset_get_peer(pset, id);
404
  if (!p) {
405
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
406
    if (reg) {
407
      add_peer(id,NULL);
408
      p = peerset_get_peer(pset,id);
409
    }
410
  }
411

    
412
  return p;
413
}
414

    
415
int peers_init(void)
416
{
417
  fprintf(stderr,"peers_init\n");
418
  pset = peerset_init(0);
419
  return pset ? 1 : 0;
420
}
421

    
422
struct peerset *get_peers(void)
423
{
424
  return pset;
425
}