Statistics
| Branch: | Revision:

pstreamer / src / topology.c @ fe40d609

History | View | Annotate | Download (13.5 KB)

1
/*
2
 * Copyright (c) 2014-2017 Luca Baldesi
3
 *
4
 * This file is part of PeerStreamer.
5
 *
6
 * PeerStreamer is free software: you can redistribute it and/or
7
 * modify it under the terms of the GNU Affero General Public License as
8
 * published by the Free Software Foundation, either version 3 of the
9
 * License, or (at your option) any later version.
10
 *
11
 * PeerStreamer is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero
14
 * General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU Affero General Public License
17
 * along with PeerStreamer.  If not, see <http://www.gnu.org/licenses/>.
18
 *
19
 */
20
#include <stdint.h>
21
#include <stdio.h>
22
#include <sys/time.h>
23
#include <time.h>
24
#include <stdlib.h>
25
#include <string.h>
26
//
27
#include <math.h>
28
#include <net_helper.h>
29
#include <peerset.h>
30
#include <peersampler.h>
31
#include <peer.h>
32
#include <grapes_msg_types.h>
33
//
34
#include "compatibility/timer.h"
35
//
36
#include "topology.h"
37
#include "net_helpers.h"
38
#include "dbg.h"
39
#include "measures.h"
40
#include "streaming.h"
41

    
42
#define MAX(A,B) (((A) > (B)) ? (A) : (B))
43
enum neighbourhood_msg_t {NEIGHBOURHOOD_ADD, NEIGHBOURHOOD_REMOVE, NEIGHBOURHOOD_QUIT};
44

    
45
#define DEFAULT_PEER_CBSIZE 50
46

    
47
#ifndef NAN        //NAN is missing in some old math.h versions
48
#define NAN            (0.0/0.0)
49
#endif
50

    
51
struct metadata {
52
  uint16_t cb_size;
53
} __attribute__((packed));
54

    
55
enum peer_choice {PEER_CHOICE_RANDOM, PEER_CHOICE_BEST, PEER_CHOICE_WORST};
56

    
57
struct topology {
58
        double desired_bw;
59
        double desired_rtt;
60
        double alpha_target;
61
        double topo_mem;
62
        bool topo_out;
63
        bool topo_in;
64
        bool topo_keep_best;
65
        bool topo_add_best;
66
        int neighbourhood_target_size;
67
        struct timeval tout_bmap;
68
        struct metadata my_metadata;        
69

    
70
        const struct psinstance * ps;
71
        struct psample_context * tc;
72
        struct peerset * neighbourhood;
73
        struct peerset * swarm_bucket;
74
        struct peerset * locked_neighs;
75
};
76

    
77
struct peerset * topology_get_neighbours(struct topology * t)
78
{
79
        return t->neighbourhood;
80
}
81

    
82
void peerset_print(const struct peerset * pset, const char * name)
83
{
84
        const struct peer * p;
85
        int i;
86
        if(name) fprintf(stderr,"%s\n",name);
87
        if(pset)
88
                peerset_for_each(pset,p,i)
89
                        fprintf(stderr, "\t%s\n", nodeid_static_str(p->id));
90
}
91

    
92
void update_metadata(struct topology * t)
93
{
94
        t->my_metadata.cb_size = psinstance_is_source(t->ps) ? 0 : psinstance_chunkbuffer_size(t->ps);
95
}
96

    
97
struct peer * topology_get_peer(struct topology * t, const struct nodeID * id)
98
{
99
        struct peer * p = NULL;
100
        p = peerset_get_peer(t->swarm_bucket,id);
101
        if(p == NULL)
102
                p = peerset_get_peer(t->neighbourhood,id);
103
        return p;
104
}
105

    
106
int topology_init(struct topology * t, const struct psinstance * ps, const char *config)
107
{
108
        bind_msg_type(MSG_TYPE_NEIGHBOURHOOD);
109
        bind_msg_type(MSG_TYPE_TOPOLOGY);
110
        t->tout_bmap.tv_sec = 20;
111
        t->tout_bmap.tv_usec = 0;
112
        t->ps = ps;
113

    
114
        t->neighbourhood = peerset_init(0);
115
        t->swarm_bucket = peerset_init(0);
116
        t->locked_neighs = peerset_init(0);
117

    
118
        t->desired_bw = 0;        //TODO: turn on capacity measurement and set meaningful default value
119
        t->desired_rtt = 0.2;
120
        t->alpha_target = 0.4;
121
        t->topo_mem = 0.7;
122
        t->topo_out = true; //peer selects out-neighbours
123
        t->topo_in = true; //peer selects in-neighbours (combined means bidirectional)
124
        t->topo_keep_best = false;
125
        t->topo_add_best = false;
126
        t->neighbourhood_target_size = 30;
127

    
128
        update_metadata(t);
129
        t->tc = psample_init(psinstance_nodeid(ps), &(t->my_metadata), sizeof(struct metadata), config);
130
        
131
  //fprintf(stderr,"[DEBUG] done with topology init\n");
132
        return t->tc && t->neighbourhood && t->swarm_bucket ? 1 : 0;
133
}
134

    
135
struct topology * topology_create(const struct psinstance *ps, const char *config)
136
{
137
        struct topology * t = NULL;
138
        t = malloc(sizeof(struct topology));
139
        topology_init(t, ps, config);
140
        return t;
141
}
142

    
143
/*useful during bootstrap*/
144
int topology_node_insert(struct topology * t, struct nodeID *id)
145
{
146
        struct metadata m = {0};
147
        if (topology_get_peer(t, id) == NULL)
148
                peerset_add_peer(t->swarm_bucket,id);
149
        return psample_add_peer(t->tc,id,&m,sizeof(m));
150
}
151

    
152
void topology_peer_set_metadata(struct  peer *p, const struct metadata *m)
153
{
154
        if (p)
155
        {
156
                if (m)
157
                {
158
                        p->cb_size = m->cb_size;
159
                }
160
                else
161
                {
162
                        p->cb_size = DEFAULT_PEER_CBSIZE;
163
                }
164

    
165
        }
166
}
167

    
168
struct peer * neighbourhood_add_peer(struct topology * t, const struct nodeID *id)
169
{
170
        struct peer * p = NULL;
171
        if (id)
172
        {
173
                p = peerset_pop_peer(t->swarm_bucket,id);
174
                if(p)
175
                        peerset_push_peer(t->neighbourhood,p);
176
                else
177
                {
178
                        peerset_add_peer(t->neighbourhood,id);
179
                        p = peerset_get_peer(t->neighbourhood,id);
180
                        peerset_push_peer(t->locked_neighs,p);
181
                }
182
                measures_add_node(psinstance_measures(t->ps), p->id);
183
                // fprintf(stderr,"[DEBUG] sending bmap to peer %s \n",nodeid_static_str(id));
184
                send_bmap(psinstance_streaming(t->ps), id);
185
        }
186
        return p;
187
}
188

    
189
void neighbourhood_remove_peer(struct topology * t, const struct nodeID *id)
190
{
191
        struct peer *p=NULL;
192
        if(id)
193
        {
194
                p = peerset_pop_peer(t->neighbourhood,id);
195
                if(p)
196
                        peerset_push_peer(t->swarm_bucket,p);
197

    
198
                peerset_pop_peer(t->locked_neighs,id);
199
        }
200
}
201

    
202
void topology_remove_peer(struct topology * t, const struct nodeID *id)
203
{
204
        if(t && id)
205
        {
206
                peerset_remove_peer(t->neighbourhood, id);
207
                peerset_remove_peer(t->swarm_bucket, id);
208
                peerset_remove_peer(t->locked_neighs, id);
209
                psample_remove_peer(t->tc, id);
210
        }
211
}
212

    
213
void neighbourhood_message_parse(struct topology * t, struct nodeID *from, const uint8_t *buff, size_t len)
214
{
215
        struct metadata m = {0};
216
        struct peer *p = NULL;
217

    
218
        switch(buff[0]) {
219
                case NEIGHBOURHOOD_ADD:
220
                        // fprintf(stderr,"[DEBUG] adding peer %s from message\n",nodeid_static_str(from));
221
                        p = neighbourhood_add_peer(t, from);
222
                        if (len >= (sizeof(struct metadata) + 1))
223
                        {
224
                                memmove(&m,buff+1,sizeof(struct metadata));
225
                                topology_peer_set_metadata(p,&m);
226
                        }
227
                        break;
228

    
229
                case NEIGHBOURHOOD_REMOVE:
230
                        neighbourhood_remove_peer(t, from);
231
                        break;
232
                case NEIGHBOURHOOD_QUIT:
233
                        topology_remove_peer(t, from);
234
                        break;
235
                default:
236
                        dprintf("Unknown neighbourhood message type");
237
        }
238
}
239

    
240
void topology_message_parse(struct topology * t, struct nodeID *from, const uint8_t *buff, size_t len)
241
{
242
        switch(buff[0]) {
243
                case MSG_TYPE_NEIGHBOURHOOD:
244
                        if (t->topo_in)
245
                        {
246
                                neighbourhood_message_parse(t, from, buff+1,len);
247
                                reg_neigh_size(psinstance_measures(t->ps), peerset_size(t->neighbourhood));
248
                        }
249
                        break;
250
                case MSG_TYPE_TOPOLOGY:
251
                        psample_parse_data(t->tc,buff,len);
252
                        //fprintf(stderr,"[DEBUG] received TOPO message\n");
253
                        break;
254
                default:
255
                        fprintf(stderr,"Unknown topology message type");
256
        }
257
}
258

    
259
void topology_sample_peers(struct topology * t)
260
{
261
        int sample_nodes_num,sample_metas_num,i;
262
        const struct nodeID * const * sample_nodes;
263
        struct metadata const * sample_metas;
264
        struct peer * p;
265
                
266
        //fprintf(stderr,"[DEBUG] starting peer sampling\n");
267
        sample_nodes = psample_get_cache(t->tc,&sample_nodes_num);
268
        sample_metas = psample_get_metadata(t->tc,&sample_metas_num);
269
        for (i=0;i<sample_nodes_num;i++)
270
        {
271
                //fprintf(stderr,"[DEBUG] sampled node: %s\n",nodeid_static_str(sample_nodes[i]));
272
                p = topology_get_peer(t, sample_nodes[i]);
273
                if(p==NULL)
274
                {
275
                        peerset_add_peer(t->swarm_bucket,sample_nodes[i]);
276
                        p = topology_get_peer(t, sample_nodes[i]);
277
                }
278
                topology_peer_set_metadata(p,&(sample_metas[i]));        
279
        }
280
}
281

    
282
void topology_blacklist_add(struct topology * t, struct nodeID * id)
283
{
284
}
285

    
286
void neighbourhood_drop_unactives(struct topology * t, struct timeval * bmap_timeout)
287
{
288
  struct timeval tnow, told;
289
        struct peer *const *peers;
290
        int i;
291
  gettimeofday(&tnow, NULL);
292
  timersub(&tnow, bmap_timeout, &told);
293
  peers = peerset_get_peers(t->neighbourhood);
294
  for (i = 0; i < peerset_size(t->neighbourhood); i++) {
295
    if ( (!timerisset(&peers[i]->bmap_timestamp) && timercmp(&peers[i]->creation_timestamp, &told, <) ) ||
296
         ( timerisset(&peers[i]->bmap_timestamp) && timercmp(&peers[i]->bmap_timestamp, &told, <)     )   ) {
297
      dprintf("Topo: dropping inactive %s (peersset_size: %d)\n", nodeid_static_str(peers[i]->id), peerset_size(t->neighbourhood));
298
      if (peerset_size(t->neighbourhood) > 1) {        // avoid dropping our last link to the world
299
              topology_blacklist_add(t, peers[i]->id);
300
              neighbourhood_remove_peer(t, peers[i]->id);
301
      }
302
    }
303
  }
304
        
305
}
306

    
307
void array_shuffle(void *base, int nmemb, int size) {
308
  int i,newpos;
309
  unsigned char t[size];
310
  unsigned char* b = base;
311

    
312
  for (i = nmemb - 1; i > 0; i--) {
313
    newpos = (rand()/(RAND_MAX + 1.0)) * (i + 1);
314
    memcpy(t, b + size * newpos, size);
315
    memmove(b + size * newpos, b + size * i, size);
316
    memcpy(b + size * i, t, size);
317
  }
318
}
319

    
320
double get_rtt_of(struct topology *t, const struct nodeID* n){
321
  return NAN;
322
}
323

    
324
double get_capacity_of(struct topology *t, const struct nodeID* n){
325
  struct peer *p = topology_get_peer(t, n);
326
  if (p) {
327
    return p->capacity;
328
  }
329

    
330
  return NAN;
331
}
332

    
333
int neighbourhood_send_msg(struct topology *t, const struct peer * p,uint8_t type)
334
{
335
        uint8_t * msg;
336
        int res;
337
        msg = malloc(sizeof(struct metadata)+2);
338
        msg[0] = MSG_TYPE_NEIGHBOURHOOD;
339
        msg[1] = type;
340
        memmove(msg+2,&(t->my_metadata),sizeof(struct metadata));
341
        res = send_to_peer(psinstance_nodeid(t->ps), p->id, msg, sizeof(struct metadata)+2);
342
        free(msg);
343
        return res;        
344
}
345

    
346
void topology_quit_overlay(struct topology *t)
347
{
348
        const struct peer * p;
349
        int i;
350

    
351
        dprintf("Notifying known peers of quitting...\n");
352
        peerset_for_each(t->neighbourhood, p, i)
353
                neighbourhood_send_msg(t, p, NEIGHBOURHOOD_QUIT);
354
        peerset_for_each(t->swarm_bucket, p, i)
355
                neighbourhood_send_msg(t, p, NEIGHBOURHOOD_QUIT);
356
}
357

    
358
void peerset_destroy_reference_copy(struct peerset ** pset)
359
{
360
        while (peerset_size(*pset))
361
                peerset_pop_peer(*pset,(peerset_get_peers(*pset)[0])->id);
362

    
363
        peerset_destroy(pset);
364
}
365

    
366
struct peerset * peerset_create_reference_copy(struct peerset * pset)
367
{
368
        struct peerset * ns;
369
        const struct peer * p;
370
        int i;
371

    
372
        ns = peerset_init(0);
373
        peerset_for_each(pset,p,i)
374
                peerset_push_peer(ns, (struct peer *)p);
375
        return ns;
376
}
377

    
378
struct peer *nodeid_to_peer(struct topology * t, struct nodeID *id,int reg)
379
{
380
        struct peer * p;
381
        p = topology_get_peer(t, id);
382
        if(p==NULL && reg)
383
        {
384
                topology_node_insert(t, id);
385
                neighbourhood_add_peer(t, id);
386
                p = topology_get_peer(t, id);
387
                if(t->topo_out)
388
                        neighbourhood_send_msg(t, p, NEIGHBOURHOOD_ADD);
389
        }
390
        return p;
391
}
392

    
393
/* move num peers from pset1 to pset2 after applying the filtering_mask function and following the given criterion */
394
void topology_move_peers(struct peerset * pset1, struct peerset * pset2,int num,enum peer_choice criterion,bool (*filter_mask)(const struct peer *),int (*cmp_peer)(const void* p0, const void* p1)) 
395
{
396
        struct peer * const * const_peers;
397
        struct peer ** peers;
398
        struct peer *p;
399
        int peers_num,i,j;
400

    
401
        peers_num = peerset_size(pset1);
402
        const_peers = peerset_get_peers(pset1);
403
        peers = (struct peer **)malloc(sizeof(struct peer *)*peers_num);
404
        if (filter_mask)
405
        {
406
                for(i = 0,j = 0; i<peers_num; i++)
407
                        if (filter_mask(const_peers[i]))
408
                                peers[j++] = const_peers[i];
409
                peers_num = j;
410
        } else
411
                memmove(peers,const_peers,peers_num*sizeof(struct peer*));
412

    
413
        if (criterion != PEER_CHOICE_RANDOM && cmp_peer != NULL) {
414
    //fprintf(stderr,"[DEBUG] choosen the qsort\n");
415
                qsort(peers, peers_num, sizeof(struct peer*), cmp_peer);
416
        } else {
417
    array_shuffle(peers, peers_num, sizeof(struct peer *));
418
        }
419
        for (i=0; i<peers_num && i<num; i++)
420
        {
421
                if (criterion == PEER_CHOICE_WORST)
422
                        p = peerset_pop_peer(pset1,(peers[peers_num -i -1])->id);
423
                else
424
                        p = peerset_pop_peer(pset1,(peers[i])->id);
425
                peerset_push_peer(pset2,p);
426
        }
427
        free(peers);
428
}
429

    
430
void peerset_reference_copy_add(struct peerset * dst, struct peerset * src)
431
{
432
  const struct peer *p;
433
  int i;
434

    
435
        peerset_for_each(src,p,i)
436
                peerset_push_peer(dst, (struct peer *)p);
437
}
438

    
439
void topology_signal_change(struct topology *t, const struct peerset const * old_neighs)
440
{
441
        const struct peer * p;
442
        int i;
443
  reg_neigh_size(psinstance_measures(t->ps), peerset_size(t->neighbourhood));
444

    
445
        // advertise changes
446
        if(t->topo_out)
447
        {
448
                peerset_for_each(t->neighbourhood,p,i)
449
    {
450
                        if(peerset_check(old_neighs,p->id) < 0)
451
                                neighbourhood_send_msg(t, p, NEIGHBOURHOOD_ADD);
452
    }
453
                peerset_for_each(old_neighs,p,i)
454
    {
455
                        if(peerset_check(t->neighbourhood,p->id) < 0)
456
                                neighbourhood_send_msg(t,p,NEIGHBOURHOOD_REMOVE);
457
    }
458
        }
459
}
460

    
461

    
462
void topology_update_random(struct topology * t)
463
{
464
        int discard_num;
465
        int others_num;
466

    
467
        discard_num = (int)((1-t->topo_mem) * peerset_size(t->neighbourhood));
468
        topology_move_peers(t->neighbourhood,t->swarm_bucket,discard_num,PEER_CHOICE_RANDOM,NULL,NULL);
469

    
470
        others_num = MAX(t->neighbourhood_target_size-peerset_size(t->neighbourhood),0);
471
        topology_move_peers(t->swarm_bucket,t->neighbourhood,others_num,PEER_CHOICE_RANDOM,NULL,NULL);
472
}
473

    
474

    
475
void topology_update(struct topology * t)
476
{
477
        struct peerset * old_neighs;
478
  const struct peer * p;
479
  int i;
480

    
481
  psample_parse_data(t->tc,NULL,0); // needed in order to trigger timed sending of TOPO messages
482

    
483
        update_metadata(t);
484
        topology_sample_peers(t);
485

    
486
  if timerisset(&(t->tout_bmap) )
487
                neighbourhood_drop_unactives(t, &(t->tout_bmap));
488

    
489
        old_neighs = peerset_create_reference_copy(t->neighbourhood);
490

    
491
    topology_update_random(t);
492

    
493
  topology_signal_change(t, old_neighs);
494
        peerset_destroy_reference_copy(&old_neighs);
495

    
496
    peerset_for_each(t->swarm_bucket,p,i)
497
      peerset_pop_peer(t->locked_neighs,p->id);
498
    peerset_clear(t->swarm_bucket,0);  // we don't remember past peers
499
}
500

    
501
void topology_destroy(struct topology **t)
502
{
503
        topology_quit_overlay(*t);
504
        if (t && *t)
505
        {
506
                if(((*t)->locked_neighs))
507
                        peerset_destroy_reference_copy(&((*t)->locked_neighs));
508
                if(((*t)->neighbourhood))
509
                        peerset_destroy(&((*t)->neighbourhood));
510
                if(((*t)->swarm_bucket))
511
                        peerset_destroy(&((*t)->swarm_bucket));
512
                if(((*t)->tc))
513
                        psample_destroy(&((*t)->tc));
514
                free(*t);
515
        }
516
}