Statistics
| Branch: | Revision:

pstreamer / src / topology.c @ 5820d286

History | View | Annotate | Download (13.7 KB)

1
/*
2
 * Copyright (c) 2014-2017 Luca Baldesi
3
 *
4
 * This file is part of PeerStreamer.
5
 *
6
 * PeerStreamer is free software: you can redistribute it and/or
7
 * modify it under the terms of the GNU Affero General Public License as
8
 * published by the Free Software Foundation, either version 3 of the
9
 * License, or (at your option) any later version.
10
 *
11
 * PeerStreamer is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero
14
 * General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU Affero General Public License
17
 * along with PeerStreamer.  If not, see <http://www.gnu.org/licenses/>.
18
 *
19
 */
20
#include <stdint.h>
21
#include <stdio.h>
22
#include <sys/time.h>
23
#include <time.h>
24
#include <stdlib.h>
25
#include <string.h>
26
//
27
#include <math.h>
28
#include <net_helper.h>
29
#include <peerset.h>
30
#include <peersampler.h>
31
#include <peer.h>
32
#include <grapes_msg_types.h>
33
#include<grapes_config.h>
34
//
35
#include "compatibility/timer.h"
36
//
37
#include "topology.h"
38
#include "net_helpers.h"
39
#include "dbg.h"
40
#include "measures.h"
41
#include "streaming.h"
42

    
43
#define MAX(A,B) (((A) > (B)) ? (A) : (B))
44
enum neighbourhood_msg_t {NEIGHBOURHOOD_ADD, NEIGHBOURHOOD_REMOVE, NEIGHBOURHOOD_QUIT};
45

    
46
#define DEFAULT_PEER_CBSIZE 50
47

    
48
#ifndef NAN        //NAN is missing in some old math.h versions
49
#define NAN            (0.0/0.0)
50
#endif
51

    
52
struct metadata {
53
  uint16_t cb_size;
54
} __attribute__((packed));
55

    
56
enum peer_choice {PEER_CHOICE_RANDOM, PEER_CHOICE_BEST, PEER_CHOICE_WORST};
57

    
58
struct topology {
59
        double desired_bw;
60
        double desired_rtt;
61
        double alpha_target;
62
        double topo_mem;
63
        bool topo_out;
64
        bool topo_in;
65
        bool topo_keep_best;
66
        bool topo_add_best;
67
        int neighbourhood_target_size;
68
        struct timeval tout_bmap;
69
        struct metadata my_metadata;        
70

    
71
        const struct psinstance * ps;
72
        struct psample_context * tc;
73
        struct peerset * neighbourhood;
74
        struct peerset * swarm_bucket;
75
        struct peerset * locked_neighs;
76
};
77

    
78
struct peerset * topology_get_neighbours(struct topology * t)
79
{
80
        return t->neighbourhood;
81
}
82

    
83
void peerset_print(const struct peerset * pset, const char * name)
84
{
85
        const struct peer * p;
86
        int i;
87
        if(name) fprintf(stderr,"%s\n",name);
88
        if(pset)
89
                peerset_for_each(pset,p,i)
90
                        fprintf(stderr, "\t%s\n", nodeid_static_str(p->id));
91
}
92

    
93
void update_metadata(struct topology * t)
94
{
95
        t->my_metadata.cb_size = psinstance_is_source(t->ps) ? 0 : psinstance_chunkbuffer_size(t->ps);
96
}
97

    
98
struct peer * topology_get_peer(struct topology * t, const struct nodeID * id)
99
{
100
        struct peer * p = NULL;
101
        p = peerset_get_peer(t->swarm_bucket,id);
102
        if(p == NULL)
103
                p = peerset_get_peer(t->neighbourhood,id);
104
        return p;
105
}
106

    
107
int topology_init(struct topology * t, const struct psinstance * ps, const char *config)
108
{
109
        struct tag * tags;
110
        int peer_timeout;
111

    
112
        tags = grapes_config_parse(config);
113

    
114
        bind_msg_type(MSG_TYPE_NEIGHBOURHOOD);
115
        bind_msg_type(MSG_TYPE_TOPOLOGY);
116

    
117
        grapes_config_value_int_default(tags, "peer_timeout", &peer_timeout, 10);
118
        t->tout_bmap.tv_sec = peer_timeout;
119
        t->tout_bmap.tv_usec = 0;
120
        t->ps = ps;
121

    
122
        t->neighbourhood = peerset_init(0);
123
        t->swarm_bucket = peerset_init(0);
124
        t->locked_neighs = peerset_init(0);
125

    
126
        t->desired_bw = 0;        //TODO: turn on capacity measurement and set meaningful default value
127
        t->desired_rtt = 0.2;
128
        t->alpha_target = 0.4;
129
        t->topo_mem = 0.7;
130
        t->topo_out = true; //peer selects out-neighbours
131
        t->topo_in = true; //peer selects in-neighbours (combined means bidirectional)
132
        t->topo_keep_best = false;
133
        t->topo_add_best = false;
134
        t->neighbourhood_target_size = 30;
135

    
136
        update_metadata(t);
137
        t->tc = psample_init(psinstance_nodeid(ps), &(t->my_metadata), sizeof(struct metadata), config);
138
        
139
        free(tags);
140
        return t->tc && t->neighbourhood && t->swarm_bucket ? 1 : 0;
141
}
142

    
143
struct topology * topology_create(const struct psinstance *ps, const char *config)
144
{
145
        struct topology * t = NULL;
146
        t = malloc(sizeof(struct topology));
147
        topology_init(t, ps, config);
148
        return t;
149
}
150

    
151
/*useful during bootstrap*/
152
int topology_node_insert(struct topology * t, struct nodeID *id)
153
{
154
        struct metadata m = {0};
155
        if (topology_get_peer(t, id) == NULL)
156
                peerset_add_peer(t->swarm_bucket,id);
157
        return psample_add_peer(t->tc,id,&m,sizeof(m));
158
}
159

    
160
void topology_peer_set_metadata(struct  peer *p, const struct metadata *m)
161
{
162
        if (p)
163
        {
164
                if (m)
165
                {
166
                        p->cb_size = m->cb_size;
167
                }
168
                else
169
                {
170
                        p->cb_size = DEFAULT_PEER_CBSIZE;
171
                }
172

    
173
        }
174
}
175

    
176
struct peer * neighbourhood_add_peer(struct topology * t, const struct nodeID *id)
177
{
178
        struct peer * p = NULL;
179
        if (id)
180
        {
181
                p = peerset_pop_peer(t->swarm_bucket,id);
182
                if(p)
183
                        peerset_push_peer(t->neighbourhood,p);
184
                else
185
                {
186
                        peerset_add_peer(t->neighbourhood,id);
187
                        p = peerset_get_peer(t->neighbourhood,id);
188
                        peerset_push_peer(t->locked_neighs,p);
189
                }
190
                measures_add_node(psinstance_measures(t->ps), p->id);
191
                // fprintf(stderr,"[DEBUG] sending bmap to peer %s \n",nodeid_static_str(id));
192
                send_bmap(psinstance_streaming(t->ps), id);
193
        }
194
        return p;
195
}
196

    
197
void neighbourhood_remove_peer(struct topology * t, const struct nodeID *id)
198
{
199
        struct peer *p=NULL;
200
        if(id)
201
        {
202
                p = peerset_pop_peer(t->neighbourhood,id);
203
                if(p)
204
                        peerset_push_peer(t->swarm_bucket,p);
205

    
206
                peerset_pop_peer(t->locked_neighs,id);
207
        }
208
}
209

    
210
void topology_remove_peer(struct topology * t, const struct nodeID *id)
211
{
212
        if(t && id)
213
        {
214
                peerset_remove_peer(t->neighbourhood, id);
215
                peerset_remove_peer(t->swarm_bucket, id);
216
                peerset_remove_peer(t->locked_neighs, id);
217
                psample_remove_peer(t->tc, id);
218
        }
219
}
220

    
221
void neighbourhood_message_parse(struct topology * t, struct nodeID *from, const uint8_t *buff, size_t len)
222
{
223
        struct metadata m = {0};
224
        struct peer *p = NULL;
225

    
226
        switch(buff[0]) {
227
                case NEIGHBOURHOOD_ADD:
228
                        // fprintf(stderr,"[DEBUG] adding peer %s from message\n",nodeid_static_str(from));
229
                        p = neighbourhood_add_peer(t, from);
230
                        if (len >= (sizeof(struct metadata) + 1))
231
                        {
232
                                memmove(&m,buff+1,sizeof(struct metadata));
233
                                topology_peer_set_metadata(p,&m);
234
                        }
235
                        break;
236

    
237
                case NEIGHBOURHOOD_REMOVE:
238
                        neighbourhood_remove_peer(t, from);
239
                        break;
240
                case NEIGHBOURHOOD_QUIT:
241
                        topology_remove_peer(t, from);
242
                        break;
243
                default:
244
                        dprintf("Unknown neighbourhood message type");
245
        }
246
}
247

    
248
void topology_message_parse(struct topology * t, struct nodeID *from, const uint8_t *buff, size_t len)
249
{
250
        switch(buff[0]) {
251
                case MSG_TYPE_NEIGHBOURHOOD:
252
                        if (t->topo_in)
253
                        {
254
                                neighbourhood_message_parse(t, from, buff+1,len);
255
                                reg_neigh_size(psinstance_measures(t->ps), peerset_size(t->neighbourhood));
256
                        }
257
                        break;
258
                case MSG_TYPE_TOPOLOGY:
259
                        psample_parse_data(t->tc,buff,len);
260
                        //fprintf(stderr,"[DEBUG] received TOPO message\n");
261
                        break;
262
                default:
263
                        fprintf(stderr,"Unknown topology message type");
264
        }
265
}
266

    
267
void topology_sample_peers(struct topology * t)
268
{
269
        int sample_nodes_num,sample_metas_num,i;
270
        const struct nodeID * const * sample_nodes;
271
        struct metadata const * sample_metas;
272
        struct peer * p;
273
                
274
        //fprintf(stderr,"[DEBUG] starting peer sampling\n");
275
        sample_nodes = psample_get_cache(t->tc,&sample_nodes_num);
276
        sample_metas = psample_get_metadata(t->tc,&sample_metas_num);
277
        for (i=0;i<sample_nodes_num;i++)
278
        {
279
                //fprintf(stderr,"[DEBUG] sampled node: %s\n",nodeid_static_str(sample_nodes[i]));
280
                p = topology_get_peer(t, sample_nodes[i]);
281
                if(p==NULL)
282
                {
283
                        peerset_add_peer(t->swarm_bucket,sample_nodes[i]);
284
                        p = topology_get_peer(t, sample_nodes[i]);
285
                }
286
                topology_peer_set_metadata(p,&(sample_metas[i]));        
287
        }
288
}
289

    
290
void topology_blacklist_add(struct topology * t, struct nodeID * id)
291
{
292
}
293

    
294
void neighbourhood_drop_unactives(struct topology * t, struct timeval * bmap_timeout)
295
{
296
  struct timeval tnow, told;
297
        struct peer *const *peers;
298
        int i;
299
  gettimeofday(&tnow, NULL);
300
  timersub(&tnow, bmap_timeout, &told);
301
  peers = peerset_get_peers(t->neighbourhood);
302
  for (i = 0; i < peerset_size(t->neighbourhood); i++) {
303
    if ( (!timerisset(&peers[i]->bmap_timestamp) && timercmp(&peers[i]->creation_timestamp, &told, <) ) ||
304
         ( timerisset(&peers[i]->bmap_timestamp) && timercmp(&peers[i]->bmap_timestamp, &told, <)     )   ) {
305
      dprintf("Topo: dropping inactive %s (peersset_size: %d)\n", nodeid_static_str(peers[i]->id), peerset_size(t->neighbourhood));
306
//      if (peerset_size(t->neighbourhood) > 1) {        // avoid dropping our last link to the world
307
              topology_blacklist_add(t, peers[i]->id);
308
              topology_remove_peer(t, peers[i]->id);
309
//      }
310
    }
311
  }
312
        
313
}
314

    
315
void array_shuffle(void *base, int nmemb, int size) {
316
  int i,newpos;
317
  unsigned char t[size];
318
  unsigned char* b = base;
319

    
320
  for (i = nmemb - 1; i > 0; i--) {
321
    newpos = (rand()/(RAND_MAX + 1.0)) * (i + 1);
322
    memcpy(t, b + size * newpos, size);
323
    memmove(b + size * newpos, b + size * i, size);
324
    memcpy(b + size * i, t, size);
325
  }
326
}
327

    
328
double get_rtt_of(struct topology *t, const struct nodeID* n){
329
  return NAN;
330
}
331

    
332
double get_capacity_of(struct topology *t, const struct nodeID* n){
333
  struct peer *p = topology_get_peer(t, n);
334
  if (p) {
335
    return p->capacity;
336
  }
337

    
338
  return NAN;
339
}
340

    
341
int neighbourhood_send_msg(struct topology *t, const struct peer * p,uint8_t type)
342
{
343
        uint8_t * msg;
344
        int res;
345
        msg = malloc(sizeof(struct metadata)+2);
346
        msg[0] = MSG_TYPE_NEIGHBOURHOOD;
347
        msg[1] = type;
348
        memmove(msg+2,&(t->my_metadata),sizeof(struct metadata));
349
        res = send_to_peer(psinstance_nodeid(t->ps), p->id, msg, sizeof(struct metadata)+2);
350
        free(msg);
351
        return res;        
352
}
353

    
354
void topology_quit_overlay(struct topology *t)
355
{
356
        const struct peer * p;
357
        int i;
358

    
359
        dprintf("Notifying known peers of quitting...\n");
360
        peerset_for_each(t->neighbourhood, p, i)
361
                neighbourhood_send_msg(t, p, NEIGHBOURHOOD_QUIT);
362
        peerset_for_each(t->swarm_bucket, p, i)
363
                neighbourhood_send_msg(t, p, NEIGHBOURHOOD_QUIT);
364
}
365

    
366
void peerset_destroy_reference_copy(struct peerset ** pset)
367
{
368
        while (peerset_size(*pset))
369
                peerset_pop_peer(*pset,(peerset_get_peers(*pset)[0])->id);
370

    
371
        peerset_destroy(pset);
372
}
373

    
374
struct peerset * peerset_create_reference_copy(struct peerset * pset)
375
{
376
        struct peerset * ns;
377
        const struct peer * p;
378
        int i;
379

    
380
        ns = peerset_init(0);
381
        peerset_for_each(pset,p,i)
382
                peerset_push_peer(ns, (struct peer *)p);
383
        return ns;
384
}
385

    
386
struct peer *nodeid_to_peer(struct topology * t, struct nodeID *id,int reg)
387
{
388
        struct peer * p;
389
        p = topology_get_peer(t, id);
390
        if(p==NULL && reg)
391
        {
392
                topology_node_insert(t, id);
393
                neighbourhood_add_peer(t, id);
394
                p = topology_get_peer(t, id);
395
                if(t->topo_out)
396
                        neighbourhood_send_msg(t, p, NEIGHBOURHOOD_ADD);
397
        }
398
        return p;
399
}
400

    
401
/* move num peers from pset1 to pset2 after applying the filtering_mask function and following the given criterion */
402
void topology_move_peers(struct peerset * pset1, struct peerset * pset2,int num,enum peer_choice criterion,bool (*filter_mask)(const struct peer *),int (*cmp_peer)(const void* p0, const void* p1)) 
403
{
404
        struct peer * const * const_peers;
405
        struct peer ** peers;
406
        struct peer *p;
407
        int peers_num,i,j;
408

    
409
        peers_num = peerset_size(pset1);
410
        const_peers = peerset_get_peers(pset1);
411
        peers = (struct peer **)malloc(sizeof(struct peer *)*peers_num);
412
        if (filter_mask)
413
        {
414
                for(i = 0,j = 0; i<peers_num; i++)
415
                        if (filter_mask(const_peers[i]))
416
                                peers[j++] = const_peers[i];
417
                peers_num = j;
418
        } else
419
                memmove(peers,const_peers,peers_num*sizeof(struct peer*));
420

    
421
        if (criterion != PEER_CHOICE_RANDOM && cmp_peer != NULL) {
422
    //fprintf(stderr,"[DEBUG] choosen the qsort\n");
423
                qsort(peers, peers_num, sizeof(struct peer*), cmp_peer);
424
        } else {
425
    array_shuffle(peers, peers_num, sizeof(struct peer *));
426
        }
427
        for (i=0; i<peers_num && i<num; i++)
428
        {
429
                if (criterion == PEER_CHOICE_WORST)
430
                        p = peerset_pop_peer(pset1,(peers[peers_num -i -1])->id);
431
                else
432
                        p = peerset_pop_peer(pset1,(peers[i])->id);
433
                peerset_push_peer(pset2,p);
434
        }
435
        free(peers);
436
}
437

    
438
void peerset_reference_copy_add(struct peerset * dst, struct peerset * src)
439
{
440
  const struct peer *p;
441
  int i;
442

    
443
        peerset_for_each(src,p,i)
444
                peerset_push_peer(dst, (struct peer *)p);
445
}
446

    
447
void topology_signal_change(struct topology *t, const struct peerset const * old_neighs)
448
{
449
        const struct peer * p;
450
        int i;
451
  reg_neigh_size(psinstance_measures(t->ps), peerset_size(t->neighbourhood));
452

    
453
        // advertise changes
454
        if(t->topo_out)
455
        {
456
                peerset_for_each(t->neighbourhood,p,i)
457
    {
458
                        if(peerset_check(old_neighs,p->id) < 0)
459
                                neighbourhood_send_msg(t, p, NEIGHBOURHOOD_ADD);
460
    }
461
                peerset_for_each(old_neighs,p,i)
462
    {
463
                        if(peerset_check(t->neighbourhood,p->id) < 0)
464
                                neighbourhood_send_msg(t,p,NEIGHBOURHOOD_REMOVE);
465
    }
466
        }
467
}
468

    
469

    
470
void topology_update_random(struct topology * t)
471
{
472
        int discard_num;
473
        int others_num;
474

    
475
        discard_num = (int)((1-t->topo_mem) * peerset_size(t->neighbourhood));
476
        topology_move_peers(t->neighbourhood,t->swarm_bucket,discard_num,PEER_CHOICE_RANDOM,NULL,NULL);
477

    
478
        others_num = MAX(t->neighbourhood_target_size-peerset_size(t->neighbourhood),0);
479
        topology_move_peers(t->swarm_bucket,t->neighbourhood,others_num,PEER_CHOICE_RANDOM,NULL,NULL);
480
}
481

    
482

    
483
void topology_update(struct topology * t)
484
{
485
        struct peerset * old_neighs;
486
  const struct peer * p;
487
  int i;
488

    
489
  psample_parse_data(t->tc,NULL,0); // needed in order to trigger timed sending of TOPO messages
490

    
491
        update_metadata(t);
492
        topology_sample_peers(t);
493

    
494
  if timerisset(&(t->tout_bmap) )
495
                neighbourhood_drop_unactives(t, &(t->tout_bmap));
496

    
497
        old_neighs = peerset_create_reference_copy(t->neighbourhood);
498

    
499
    topology_update_random(t);
500

    
501
  topology_signal_change(t, old_neighs);
502
        peerset_destroy_reference_copy(&old_neighs);
503

    
504
    peerset_for_each(t->swarm_bucket,p,i)
505
      peerset_pop_peer(t->locked_neighs,p->id);
506
    peerset_clear(t->swarm_bucket,0);  // we don't remember past peers
507
}
508

    
509
void topology_destroy(struct topology **t)
510
{
511
        topology_quit_overlay(*t);
512
        if (t && *t)
513
        {
514
                if(((*t)->locked_neighs))
515
                        peerset_destroy_reference_copy(&((*t)->locked_neighs));
516
                if(((*t)->neighbourhood))
517
                        peerset_destroy(&((*t)->neighbourhood));
518
                if(((*t)->swarm_bucket))
519
                        peerset_destroy(&((*t)->swarm_bucket));
520
                if(((*t)->tc))
521
                        psample_destroy(&((*t)->tc));
522
                free(*t);
523
        }
524
}