Statistics
| Branch: | Revision:

streamers / topology.c @ 9a1f5816

History | View | Annotate | Download (12 KB)

1
/*
2
 * Copyright (c) 2014-2015 Luca Baldesi
3
 *
4
 * This file is part of PeerStreamer.
5
 *
6
 * PeerStreamer is free software: you can redistribute it and/or
7
 * modify it under the terms of the GNU Affero General Public License as
8
 * published by the Free Software Foundation, either version 3 of the
9
 * License, or (at your option) any later version.
10
 *
11
 * PeerStreamer is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero
14
 * General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU Affero General Public License
17
 * along with PeerStreamer.  If not, see <http://www.gnu.org/licenses/>.
18
 *
19
 */
20
#include <stdint.h>
21
#include <stdio.h>
22
#include <sys/time.h>
23
#include <time.h>
24
#include <stdlib.h>
25
#include <string.h>
26
//
27
#include <math.h>
28
#include <net_helper.h>
29
#include <peerset.h>
30
#include <peersampler.h>
31
#include <peer.h>
32
#include <grapes_msg_types.h>
33
//
34
#include "compatibility/timer.h"
35
//
36
#include "topology.h"
37
#include "nodeid_set.h"
38
#include "streaming.h"
39
#include "dbg.h"
40
#include "measures.h"
41
#include "streamer.h"
42
#include "node_addr.h"
43

    
44
#define MAX(A,B) (((A) > (B)) ? (A) : (B))
45
#define NEIGHBOURHOOD_ADD 0
46
#define NEIGHBOURHOOD_REMOVE 1
47
#define DEFAULT_PEER_CBSIZE 50
48

    
49
#ifndef NAN        //NAN is missing in some old math.h versions
50
#define NAN            (0.0/0.0)
51
#endif
52

    
53
double desired_bw = 0;        //TODO: turn on capacity measurement and set meaningful default value
54
double desired_rtt = 0.2;
55
double alpha_target = 0.4;
56
double topo_mem = 0.7;
57

    
58
bool topo_out = true; //peer selects out-neighbours
59
bool topo_in = true; //peer selects in-neighbours (combined means bidirectional)
60

    
61
bool topo_keep_best = false;
62
bool topo_add_best = false;
63

    
64
int NEIGHBOURHOOD_TARGET_SIZE = 30;
65
enum peer_choice {PEER_CHOICE_RANDOM, PEER_CHOICE_BEST, PEER_CHOICE_WORST};
66

    
67
struct metadata {
68
  uint16_t cb_size;
69
  uint16_t cps;
70
  float capacity;
71
  float recv_delay;
72
} __attribute__((packed));
73

    
74
struct topology_context{
75
        struct metadata my_metadata;        
76
        struct psample_context * tc;
77
        struct peerset * neighbourhood;
78
        struct peerset * swarm_bucket;
79
        struct timeval tout_bmap;
80
} context;
81

    
82
struct peerset * topology_get_neighbours()
83
{
84
        return context.neighbourhood;
85
}
86

    
87
void peerset_print(const struct peerset * pset,const char * name)
88
{
89
        const struct peer * p;
90
        int i;
91
        if(name) fprintf(stderr,"%s\n",name);
92
        if(pset)
93
                peerset_for_each(pset,p,i)
94
                        fprintf(stderr,"\t%s\n",node_addr_tr(p->id));
95
}
96

    
97
void update_metadata()
98
{
99
        context.my_metadata.cb_size = am_i_source() ? 0 : get_cb_size();
100
        context.my_metadata.recv_delay = get_receive_delay();
101
        context.my_metadata.cps = get_chunks_per_sec();
102
        context.my_metadata.capacity = get_capacity();
103
}
104

    
105
struct peer * topology_get_peer(const struct nodeID * id)
106
{
107
        struct peer * p = NULL;
108
        p = peerset_get_peer(context.swarm_bucket,id);
109
        if(p == NULL)
110
                p = peerset_get_peer(context.neighbourhood,id);
111
        return p;
112
}
113

    
114
int topology_init(struct nodeID *myID,const char *config)
115
{
116
        bind_msg_type(MSG_TYPE_NEIGHBOURHOOD);
117
        bind_msg_type(MSG_TYPE_TOPOLOGY);
118
        update_metadata();
119
        context.tout_bmap.tv_sec = 20;
120
        context.tout_bmap.tv_usec = 0;
121
        context.tc = psample_init(myID,&(context.my_metadata),sizeof(struct metadata),config);
122
        context.neighbourhood = peerset_init(0);
123
        context.swarm_bucket = peerset_init(0);
124
        return context.tc && context.neighbourhood && context.swarm_bucket ? 1 : 0;
125
}
126

    
127
/*useful during bootstrap*/
128
int topology_node_insert(struct nodeID *id)
129
{
130
        struct metadata m = {0};
131
        if (topology_get_peer(id) == NULL)
132
                peerset_add_peer(context.swarm_bucket,id);
133
        return psample_add_peer(context.tc,id,&m,sizeof(m));
134
}
135

    
136
void topology_peer_set_metadata(struct  peer *p, const struct metadata *m)
137
{
138
        if (p)
139
        {
140
                if (m)
141
                {
142
                        p->cb_size = m->cb_size;
143
                        p->capacity = m->capacity;
144
                }
145
                else
146
                {
147
                        p->cb_size = DEFAULT_PEER_CBSIZE;
148
                        p->capacity = 0;
149
                }
150

    
151
        }
152
}
153

    
154
struct peer * neighbourhood_add_peer(const struct nodeID *id)
155
{
156
        struct peer * p = NULL;
157
        if (id)
158
        {
159
                p = peerset_pop_peer(context.swarm_bucket,id);
160
                if(p)
161
                        peerset_push_peer(context.neighbourhood,p);
162
                else
163
                {
164
                        peerset_add_peer(context.neighbourhood,id);
165
                        p = peerset_get_peer(context.neighbourhood,id);
166
                }
167
                add_measures(p->id);
168
                send_bmap(id);
169
        }
170
        return p;
171
}
172

    
173
void neighbourhood_remove_peer(const struct nodeID *id)
174
{
175
        struct peer *p=NULL;
176
        if(id)
177
        {
178
                p = peerset_pop_peer(context.neighbourhood,id);
179
                if(p)
180
                        peerset_push_peer(context.swarm_bucket,p);
181
        }
182
}
183

    
184
void neighbourhood_message_parse(struct nodeID *from,const uint8_t *buff,int len)
185
{
186
        struct metadata m = {0};
187
        struct peer *p = NULL;
188

    
189
        switch(buff[0]) {
190
                case NEIGHBOURHOOD_ADD:
191
                        p = neighbourhood_add_peer(from);
192
                        if (len >= (sizeof(struct metadata) + 1))
193
                        {
194
                                memmove(&m,buff+1,sizeof(struct metadata));
195
                                topology_peer_set_metadata(p,&m);
196
                        }
197
                        break;
198

    
199
                case NEIGHBOURHOOD_REMOVE:
200
                        neighbourhood_remove_peer(from);
201
                        break;
202
                default:
203
                        dprintf("Unknown neighbourhood message type");
204
        }
205
}
206

    
207
void topology_message_parse(struct nodeID *from, const uint8_t *buff, int len)
208
{
209
        switch(buff[0]) {
210
                case MSG_TYPE_NEIGHBOURHOOD:
211
                        if (topo_in)
212
                        {
213
                                neighbourhood_message_parse(from,buff+1,len);
214
                                reg_neigh_size(peerset_size(context.neighbourhood));
215
                        }
216
                        break;
217
                case MSG_TYPE_TOPOLOGY:
218
                        psample_parse_data(context.tc,buff,len);
219
                        break;
220
                default:
221
                        fprintf(stderr,"Unknown topology message type");
222
        }
223
}
224

    
225
void topology_sample_peers()
226
{
227
        int sample_nodes_num,sample_metas_num,i;
228
        const struct nodeID * const * sample_nodes;
229
        struct metadata const * sample_metas;
230
        struct peer * p;
231

    
232
        sample_nodes = psample_get_cache(context.tc,&sample_nodes_num);
233
        sample_metas = psample_get_metadata(context.tc,&sample_metas_num);
234
        for (i=0;i<sample_nodes_num;i++)
235
        {
236
                p = topology_get_peer(sample_nodes[i]);
237
                if(p==NULL)
238
                {
239
                        peerset_add_peer(context.swarm_bucket,sample_nodes[i]);
240
                        p = topology_get_peer(sample_nodes[i]);
241
                }
242
                topology_peer_set_metadata(p,&(sample_metas[i]));        
243
        }
244
}
245

    
246
void topology_blacklist_add(struct nodeID * id)
247
{
248
}
249

    
250
void neighbourhood_drop_unactives(struct timeval * bmap_timeout)
251
{
252
  struct timeval tnow, told;
253
        struct peer *const *peers;
254
        int i;
255
  gettimeofday(&tnow, NULL);
256
  timersub(&tnow, bmap_timeout, &told);
257
  peers = peerset_get_peers(context.neighbourhood);
258
  for (i = 0; i < peerset_size(context.neighbourhood); i++) {
259
    if ( (!timerisset(&peers[i]->bmap_timestamp) && timercmp(&peers[i]->creation_timestamp, &told, <) ) ||
260
         ( timerisset(&peers[i]->bmap_timestamp) && timercmp(&peers[i]->bmap_timestamp, &told, <)     )   ) {
261
      dprintf("Topo: dropping inactive %s (peersset_size: %d)\n", node_addr_tr(peers[i]->id), peerset_size(context.neighbourhood));
262
      //if (peerset_size(context.neighbourhood) > 1) {        // avoid dropping our last link to the world
263
      topology_blacklist_add(peers[i]->id);
264
      neighbourhood_remove_peer(peers[i]->id);
265
      //}
266
    }
267
  }
268
        
269
}
270

    
271
void array_shuffle(void *base, int nmemb, int size) {
272
  int i,newpos;
273
  unsigned char t[size];
274
  unsigned char* b = base;
275

    
276
  for (i = nmemb - 1; i > 0; i--) {
277
    newpos = (rand()/(RAND_MAX + 1.0)) * (i + 1);
278
    memcpy(t, b + size * newpos, size);
279
    memmove(b + size * newpos, b + size * i, size);
280
    memcpy(b + size * i, t, size);
281
  }
282
}
283

    
284
//get the rtt. Currenly only MONL version is supported
285
static double get_rtt_of(const struct nodeID* n){
286
#ifdef MONL
287
  return get_rtt(n);
288
#else
289
  return NAN;
290
#endif
291
}
292

    
293
//get the declared capacity of a node
294
static double get_capacity_of(const struct nodeID* n){
295
  struct peer *p = topology_get_peer(n);
296
  if (p) {
297
    return p->capacity;
298
  }
299

    
300
  return NAN;
301
}
302

    
303
bool desiredness(const struct peer* p) {
304
        const struct nodeID* n = p->id;
305
  double rtt = get_rtt_of(n);
306
  double bw =  get_capacity_of(n);
307

    
308
  if ((isnan(rtt) && finite(desired_rtt)) || (isnan(bw) && desired_bw > 0)) {
309
    return false;
310
  } else if ((isnan(rtt) || rtt <= desired_rtt) && (isnan(bw) || bw >= desired_bw)) {
311
    return true;
312
  }
313

    
314
  return false;
315
}
316

    
317
int cmp_rtt(const void* p0, const void* p1) {
318
  double ra, rb;
319
        const struct nodeID * a = ((const struct peer *) p0) -> id;
320
        const struct nodeID * b = ((const struct peer *) p1) -> id;
321
  ra = get_rtt_of(a);
322
  rb = get_rtt_of(b);
323
  if ((isnan(ra) && isnan(rb)) || ra == rb) 
324
                return 0;
325
  else 
326
                if (isnan(rb) || ra < rb) 
327
                        return -1;
328
          else 
329
                        return 1;
330
}
331

    
332
int neighbourhood_send_msg(const struct peer * p,uint8_t type)
333
{
334
        char * msg;
335
        int res;
336
        msg = malloc(sizeof(struct metadata)+2);
337
        msg[0] = MSG_TYPE_NEIGHBOURHOOD;
338
        msg[1] = type;
339
        memmove(msg+2,&(context.my_metadata),sizeof(struct metadata));
340
        res = send_to_peer(get_my_addr(),p->id,msg,sizeof(struct metadata)+2);
341
        return res;        
342
}
343

    
344
void peerset_destroy_reference_copy(struct peerset ** pset)
345
{
346
        const struct peer * p;
347
        int i;
348
        while (peerset_size(*pset))
349
                peerset_pop_peer(*pset,(peerset_get_peers(*pset)[0])->id);
350

    
351
        peerset_destroy(pset);
352
}
353

    
354
struct peerset * peerset_create_reference_copy(const struct peerset * pset)
355
{
356
        struct peerset * ns;
357
        const struct peer * p;
358
        int i;
359

    
360
        ns = peerset_init(0);
361
        peerset_for_each(pset,p,i)
362
                peerset_push_peer(ns,p);
363
        return ns;
364
}
365

    
366
struct peer *nodeid_to_peer(const struct nodeID *id,int reg)
367
{
368
        struct peer * p;
369
        p = topology_get_peer(id);
370
        if(p==NULL && reg)
371
        {
372
                topology_node_insert(id);
373
                neighbourhood_add_peer(id);
374
                p = topology_get_peer(id);
375
                if(topo_out)
376
                        neighbourhood_send_msg(p,NEIGHBOURHOOD_ADD);
377
        }
378
        return p;
379
}
380

    
381
/* move num peers from pset1 to pset2 after applying the filtering_mask function and following the given criterion */
382
void topology_move_peers(struct peerset * pset1, struct peerset * pset2,int num,enum peer_choice criterion,bool (*filter_mask)(const struct peer *))
383
{
384
        struct peer * const * const_peers;
385
        struct peer ** peers;
386
        struct peer *p;
387
        int peers_num,i,j;
388

    
389
        peers_num = peerset_size(pset1);
390
        const_peers = peerset_get_peers(pset1);
391
        peers = (struct peer **)malloc(sizeof(struct peer *)*peers_num);
392
        if (filter_mask)
393
        {
394
                for(i = 0,j = 0; i<peers_num; i++)
395
                        if (filter_mask(const_peers[i]))
396
                                peers[j++] = const_peers[i];
397
                peers_num = j;
398
        } else
399
                memmove(peers,const_peers,peers_num*sizeof(struct peer*));
400

    
401
        if (criterion != PEER_CHOICE_RANDOM) {
402
                qsort(peers, peers_num, sizeof(struct peer*), cmp_rtt);
403
        } else {
404
    array_shuffle(peers, peers_num, sizeof(struct peer *));
405
        }
406
        for (i=0; i<peers_num && i<num; i++)
407
        {
408
                if (criterion == PEER_CHOICE_WORST)
409
                        p = peerset_pop_peer(pset1,(peers[peers_num -i -1])->id);
410
                else
411
                        p = peerset_pop_peer(pset1,(peers[i])->id);
412
                peerset_push_peer(pset2,p);
413
        }
414
}
415

    
416
void topology_update()
417
{
418
        int discard_num;
419
        int bests_num;
420
        int others_num;
421
        struct peerset * old_neighs;
422
        const struct peer * p;
423
        int i;
424

    
425
        update_metadata();
426
        topology_sample_peers();
427

    
428
  if timerisset(&(context.tout_bmap) )
429
                neighbourhood_drop_unactives(&(context.tout_bmap));
430

    
431
        old_neighs = peerset_create_reference_copy(context.neighbourhood);
432

    
433
        // we keep topo_mem% of the current neighbourhood
434
        // so we discard the (1-topo_mem)% of the neighbourhood
435
        discard_num = (int)(1-topo_mem) * peerset_size(context.neighbourhood);
436
         topology_move_peers(context.neighbourhood,context.swarm_bucket,discard_num,PEER_CHOICE_WORST,NULL);
437

    
438
        // for the remaining 1-topo_mem fraction, we select (1-alpha_target)% of good nodes
439
        bests_num = (int)(1-alpha_target)*(MAX(NEIGHBOURHOOD_TARGET_SIZE-peerset_size(context.neighbourhood),0));
440
         topology_move_peers(context.swarm_bucket,context.neighbourhood,bests_num,PEER_CHOICE_BEST,desiredness);
441

    
442
        // now we fill the neighbourhood with some other nodes (if any)
443
        others_num = MAX(NEIGHBOURHOOD_TARGET_SIZE-peerset_size(context.neighbourhood),0);
444
         topology_move_peers(context.swarm_bucket,context.neighbourhood,others_num,PEER_CHOICE_RANDOM,NULL);
445

    
446
        // DONE
447
  reg_neigh_size(peerset_size(context.neighbourhood));
448

    
449
        // advertise changes
450
        if(topo_out)
451
        {
452
                peerset_for_each(context.neighbourhood,p,i)
453
                        if(peerset_check(old_neighs,p->id) < 0)
454
                                neighbourhood_send_msg(p,NEIGHBOURHOOD_ADD);
455
                peerset_for_each(old_neighs,p,i)
456
                        if(peerset_check(context.neighbourhood,p->id) < 0)
457
                                neighbourhood_send_msg(p,NEIGHBOURHOOD_REMOVE);
458
        }
459

    
460
        peerset_destroy_reference_copy(&old_neighs);
461

    
462
        // we don't remember past peers
463
        peerset_clear(context.swarm_bucket,0);
464
}