Statistics
| Branch: | Revision:

pstreamer / src / topology.c @ 56d5986f

History | View | Annotate | Download (12.5 KB)

1
/*
2
 * Copyright (c) 2014-2017 Luca Baldesi
3
 *
4
 * This file is part of PeerStreamer.
5
 *
6
 * PeerStreamer is free software: you can redistribute it and/or
7
 * modify it under the terms of the GNU Affero General Public License as
8
 * published by the Free Software Foundation, either version 3 of the
9
 * License, or (at your option) any later version.
10
 *
11
 * PeerStreamer is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero
14
 * General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU Affero General Public License
17
 * along with PeerStreamer.  If not, see <http://www.gnu.org/licenses/>.
18
 *
19
 */
20
#include <stdint.h>
21
#include <stdio.h>
22
#include <sys/time.h>
23
#include <time.h>
24
#include <stdlib.h>
25
#include <string.h>
26
//
27
#include <math.h>
28
#include <net_helper.h>
29
#include <peerset.h>
30
#include <peersampler.h>
31
#include <peer.h>
32
#include <grapes_msg_types.h>
33
#include<grapes_config.h>
34
//
35
#include "compatibility/timer.h"
36
//
37
#include "topology.h"
38
#include "net_helpers.h"
39
#include "dbg.h"
40
#include "measures.h"
41
#include "streaming.h"
42
#include "peer_metadata.h"
43

    
44
#define MAX(A,B) (((A) > (B)) ? (A) : (B))
45
#define NEIGHBOURHOOD_ADD 0
46
#define NEIGHBOURHOOD_REMOVE 1
47

    
48
#ifndef NAN        //NAN is missing in some old math.h versions
49
#define NAN            (0.0/0.0)
50
#endif
51

    
52
enum peer_choice {PEER_CHOICE_RANDOM, PEER_CHOICE_BEST, PEER_CHOICE_WORST};
53

    
54
struct topology {
55
        double topo_mem;
56
        bool topo_out;
57
        bool topo_in;
58
        int neighbourhood_target_size;
59
        struct timeval tout_bmap;
60
        struct metadata my_metadata;        
61

    
62
        const struct psinstance * ps;
63
        struct psample_context * tc;
64
        struct peerset * neighbourhood;
65
        struct peerset * swarm_bucket;
66
        struct peerset * locked_neighs;
67
};
68

    
69
struct peerset * topology_get_neighbours(struct topology * t)
70
{
71
        return t->neighbourhood;
72
}
73

    
74
void peerset_print(const struct peerset * pset, const char * name)
75
{
76
        const struct peer * p;
77
        int i;
78
        if(name) fprintf(stderr,"%s\n",name);
79
        if(pset)
80
                peerset_for_each(pset,p,i)
81
                        fprintf(stderr, "\t%s\n", nodeid_static_str(p->id));
82
}
83

    
84
void update_metadata(struct topology * t)
85
{
86
        metadata_update(&(t->my_metadata), 
87
                        psinstance_is_source(t->ps) ? 0 : psinstance_chunkbuffer_size(t->ps),
88
                        peerset_size(t->neighbourhood));
89
}
90

    
91
struct peer * topology_get_peer(struct topology * t, const struct nodeID * id)
92
{
93
        struct peer * p = NULL;
94
        p = peerset_get_peer(t->swarm_bucket,id);
95
        if(p == NULL)
96
                p = peerset_get_peer(t->neighbourhood,id);
97
        return p;
98
}
99

    
100
int topology_init(struct topology * t, const struct psinstance * ps, const char *config)
101
{
102
        struct tag * tags;
103

    
104
        tags = grapes_config_parse(config);
105

    
106
        bind_msg_type(MSG_TYPE_NEIGHBOURHOOD);
107
        bind_msg_type(MSG_TYPE_TOPOLOGY);
108
        t->tout_bmap.tv_sec = 20;
109
        t->tout_bmap.tv_usec = 0;
110
        t->ps = ps;
111

    
112
        t->neighbourhood = peerset_init(0);
113
        t->swarm_bucket = peerset_init(0);
114
        t->locked_neighs = peerset_init(0);
115

    
116
        t->topo_mem = 0.7;
117
        t->topo_out = true; //peer selects out-neighbours
118
        t->topo_in = true; //peer selects in-neighbours (combined means bidirectional)
119
        grapes_config_value_int_default(tags, "neighbourhood_size", &(t->neighbourhood_target_size), 30);
120

    
121
        update_metadata(t);
122
        t->tc = psample_init(psinstance_nodeid(ps), &(t->my_metadata), sizeof(struct metadata), config);
123
        
124
        free(tags);
125
        return t->tc && t->neighbourhood && t->swarm_bucket ? 1 : 0;
126
}
127

    
128
struct topology * topology_create(const struct psinstance *ps, const char *config)
129
{
130
        struct topology * t = NULL;
131
        t = malloc(sizeof(struct topology));
132
        topology_init(t, ps, config);
133
        return t;
134
}
135

    
136
/*useful during bootstrap*/
137
int topology_node_insert(struct topology * t, struct nodeID *id)
138
{
139
        struct metadata m = {0};
140
        if (topology_get_peer(t, id) == NULL)
141
                peerset_add_peer(t->swarm_bucket,id);
142
        return psample_add_peer(t->tc,id,&m,sizeof(m));
143
}
144

    
145

    
146
struct peer * neighbourhood_add_peer(struct topology * t, const struct nodeID *id)
147
{
148
        struct peer * p = NULL;
149
        if (id)
150
        {
151
                p = peerset_pop_peer(t->swarm_bucket,id);
152
                if(p)
153
                        peerset_push_peer(t->neighbourhood,p);
154
                else
155
                {
156
                        peerset_add_peer(t->neighbourhood,id);
157
                        p = peerset_get_peer(t->neighbourhood,id);
158
                        peerset_push_peer(t->locked_neighs,p);
159
                }
160
                measures_add_node(psinstance_measures(t->ps), p->id);
161
                // fprintf(stderr,"[DEBUG] sending bmap to peer %s \n",nodeid_static_str(id));
162
                send_bmap(psinstance_streaming(t->ps), id);
163
        }
164
        return p;
165
}
166

    
167
void neighbourhood_remove_peer(struct topology * t, const struct nodeID *id)
168
{
169
        struct peer *p=NULL;
170
        if(id)
171
        {
172
                p = peerset_pop_peer(t->neighbourhood,id);
173
                if(p)
174
                        peerset_push_peer(t->swarm_bucket,p);
175

    
176
                peerset_pop_peer(t->locked_neighs,id);
177
        }
178
}
179

    
180
void neighbourhood_message_parse(struct topology * t, struct nodeID *from, const uint8_t *buff, size_t len)
181
{
182
        struct metadata m = {0};
183
        struct peer *p = NULL;
184

    
185
        switch(buff[0]) {
186
                case NEIGHBOURHOOD_ADD:
187
                        // fprintf(stderr,"[DEBUG] adding peer %s from message\n",nodeid_static_str(from));
188
                        p = neighbourhood_add_peer(t, from);
189
                        if (len >= (sizeof(struct metadata) + 1))
190
                        {
191
                                memmove(&m,buff+1,sizeof(struct metadata));
192
                                peer_set_metadata(p,&m);
193
                        }
194
                        break;
195

    
196
                case NEIGHBOURHOOD_REMOVE:
197
                        neighbourhood_remove_peer(t, from);
198
                        break;
199
                default:
200
                        dprintf("Unknown neighbourhood message type");
201
        }
202
}
203

    
204
void topology_message_parse(struct topology * t, struct nodeID *from, const uint8_t *buff, size_t len)
205
{
206
        switch(buff[0]) {
207
                case MSG_TYPE_NEIGHBOURHOOD:
208
                        if (t->topo_in)
209
                        {
210
                                neighbourhood_message_parse(t, from, buff+1,len);
211
                                reg_neigh_size(psinstance_measures(t->ps), peerset_size(t->neighbourhood));
212
                        }
213
                        break;
214
                case MSG_TYPE_TOPOLOGY:
215
                        psample_parse_data(t->tc,buff,len);
216
                        //fprintf(stderr,"[DEBUG] received TOPO message\n");
217
                        break;
218
                default:
219
                        fprintf(stderr,"Unknown topology message type");
220
        }
221
}
222

    
223
void topology_sample_peers(struct topology * t)
224
{
225
        int sample_nodes_num,sample_metas_num,i;
226
        const struct nodeID * const * sample_nodes;
227
        struct metadata const * sample_metas;
228
        struct peer * p;
229
                
230
        //fprintf(stderr,"[DEBUG] starting peer sampling\n");
231
        sample_nodes = psample_get_cache(t->tc,&sample_nodes_num);
232
        sample_metas = psample_get_metadata(t->tc,&sample_metas_num);
233
        for (i=0;i<sample_nodes_num;i++)
234
        {
235
                //fprintf(stderr,"[DEBUG] sampled node: %s\n",nodeid_static_str(sample_nodes[i]));
236
                p = topology_get_peer(t, sample_nodes[i]);
237
                if(p==NULL)
238
                {
239
                        //fprintf(stderr,"[DEBUG] NEW PEER!\n");
240
                        peerset_add_peer(t->swarm_bucket,sample_nodes[i]);
241
                        p = topology_get_peer(t, sample_nodes[i]);
242
                }
243
                else
244
                        //fprintf(stderr,"[DEBUG] OLD PEER!\n");
245
                peer_set_metadata(p,&(sample_metas[i]));        
246
        }
247
}
248

    
249
void topology_blacklist_add(struct topology * t, struct nodeID * id)
250
{
251
}
252

    
253
void neighbourhood_drop_unactives(struct topology * t, struct timeval * bmap_timeout)
254
{
255
  struct timeval tnow, told;
256
        struct peer *const *peers;
257
        int i;
258
  gettimeofday(&tnow, NULL);
259
  timersub(&tnow, bmap_timeout, &told);
260
  peers = peerset_get_peers(t->neighbourhood);
261
  for (i = 0; i < peerset_size(t->neighbourhood); i++) {
262
    if ( (!timerisset(&peers[i]->bmap_timestamp) && timercmp(&peers[i]->creation_timestamp, &told, <) ) ||
263
         ( timerisset(&peers[i]->bmap_timestamp) && timercmp(&peers[i]->bmap_timestamp, &told, <)     )   ) {
264
      dprintf("Topo: dropping inactive %s (peersset_size: %d)\n", nodeid_static_str(peers[i]->id), peerset_size(t->neighbourhood));
265
      //if (peerset_size(t->neighbourhood) > 1) {        // avoid dropping our last link to the world
266
      topology_blacklist_add(t, peers[i]->id);
267
      neighbourhood_remove_peer(t, peers[i]->id);
268
      //}
269
    }
270
  }
271
        
272
}
273

    
274
void array_shuffle(void *base, int nmemb, int size) {
275
  int i,newpos;
276
  unsigned char t[size];
277
  unsigned char* b = base;
278

    
279
  for (i = nmemb - 1; i > 0; i--) {
280
    newpos = (rand()/(RAND_MAX + 1.0)) * (i + 1);
281
    memcpy(t, b + size * newpos, size);
282
    memmove(b + size * newpos, b + size * i, size);
283
    memcpy(b + size * i, t, size);
284
  }
285
}
286

    
287
double get_rtt_of(struct topology *t, const struct nodeID* n){
288
  return NAN;
289
}
290

    
291
double get_capacity_of(struct topology *t, const struct nodeID* n){
292
  return NAN;
293
}
294

    
295
int neighbourhood_send_msg(struct topology *t, const struct peer * p,uint8_t type)
296
{
297
        uint8_t * msg;
298
        int res;
299
        msg = malloc(sizeof(struct metadata)+2);
300
        msg[0] = MSG_TYPE_NEIGHBOURHOOD;
301
        msg[1] = type;
302
        memmove(msg+2,&(t->my_metadata),sizeof(struct metadata));
303
        res = send_to_peer(psinstance_nodeid(t->ps), p->id, msg, sizeof(struct metadata)+2);
304
        free(msg);
305
        return res;        
306
}
307

    
308
void peerset_destroy_reference_copy(struct peerset ** pset)
309
{
310
        while (peerset_size(*pset))
311
                peerset_pop_peer(*pset,(peerset_get_peers(*pset)[0])->id);
312

    
313
        peerset_destroy(pset);
314
}
315

    
316
struct peerset * peerset_create_reference_copy(struct peerset * pset)
317
{
318
        struct peerset * ns;
319
        const struct peer * p;
320
        int i;
321

    
322
        ns = peerset_init(0);
323
        peerset_for_each(pset,p,i)
324
                peerset_push_peer(ns, (struct peer *)p);
325
        return ns;
326
}
327

    
328
struct peer *nodeid_to_peer(struct topology * t, struct nodeID *id,int reg)
329
{
330
        struct peer * p;
331
        p = topology_get_peer(t, id);
332
        if(p==NULL && reg)
333
        {
334
                topology_node_insert(t, id);
335
                neighbourhood_add_peer(t, id);
336
                p = topology_get_peer(t, id);
337
                if(t->topo_out)
338
                        neighbourhood_send_msg(t, p, NEIGHBOURHOOD_ADD);
339
        }
340
        return p;
341
}
342

    
343
/* move num peers from pset1 to pset2 after applying the filtering_mask function and following the given criterion */
344
void topology_move_peers(struct peerset * pset1, struct peerset * pset2,int num,enum peer_choice criterion,bool (*filter_mask)(const struct peer *),int (*cmp_peer)(const void* p0, const void* p1)) 
345
{
346
        struct peer * const * const_peers;
347
        struct peer ** peers;
348
        struct peer *p;
349
        int peers_num,i,j;
350

    
351
        peers_num = peerset_size(pset1);
352
        const_peers = peerset_get_peers(pset1);
353
        peers = (struct peer **)malloc(sizeof(struct peer *)*peers_num);
354
        if (filter_mask)
355
        {
356
                for(i = 0,j = 0; i<peers_num; i++)
357
                        if (filter_mask(const_peers[i]))
358
                                peers[j++] = const_peers[i];
359
                peers_num = j;
360
        } else
361
                memmove(peers,const_peers,peers_num*sizeof(struct peer*));
362

    
363
        if (criterion != PEER_CHOICE_RANDOM && cmp_peer != NULL) {
364
                qsort(peers, peers_num, sizeof(struct peer*), cmp_peer);
365
        } else {
366
    array_shuffle(peers, peers_num, sizeof(struct peer *));
367
        }
368
        for (i=0; i<peers_num && i<num; i++)
369
        {
370
                if (criterion == PEER_CHOICE_WORST)
371
                        p = peerset_pop_peer(pset1,(peers[peers_num -i -1])->id);
372
                else
373
                        p = peerset_pop_peer(pset1,(peers[i])->id);
374
                peerset_push_peer(pset2,p);
375
        }
376
        free(peers);
377
}
378

    
379
void peerset_reference_copy_add(struct peerset * dst, struct peerset * src)
380
{
381
  const struct peer *p;
382
  int i;
383

    
384
        peerset_for_each(src,p,i)
385
                peerset_push_peer(dst, (struct peer *)p);
386
}
387

    
388
void topology_signal_change(struct topology *t, const struct peerset const * old_neighs)
389
{
390
        const struct peer * p;
391
        int i;
392
  reg_neigh_size(psinstance_measures(t->ps), peerset_size(t->neighbourhood));
393

    
394
        // advertise changes
395
        if(t->topo_out)
396
        {
397
                peerset_for_each(t->neighbourhood,p,i)
398
    {
399
                        if(peerset_check(old_neighs,p->id) < 0)
400
                                neighbourhood_send_msg(t, p, NEIGHBOURHOOD_ADD);
401
    }
402
                peerset_for_each(old_neighs,p,i)
403
    {
404
                        if(peerset_check(t->neighbourhood,p->id) < 0)
405
                                neighbourhood_send_msg(t,p,NEIGHBOURHOOD_REMOVE);
406
    }
407
        }
408
}
409

    
410

    
411
void topology_update_random(struct topology * t)
412
{
413
        int discard_num;
414
        int others_num;
415

    
416
        discard_num = (int)((1-t->topo_mem) * peerset_size(t->neighbourhood));
417
        topology_move_peers(t->neighbourhood,t->swarm_bucket,discard_num,PEER_CHOICE_RANDOM,NULL,NULL);
418

    
419
        others_num = MAX(t->neighbourhood_target_size-peerset_size(t->neighbourhood),0);
420
        topology_move_peers(t->swarm_bucket,t->neighbourhood,others_num,PEER_CHOICE_RANDOM,NULL,NULL);
421
}
422

    
423

    
424
void topology_update(struct topology * t)
425
{
426
        struct peerset * old_neighs;
427
  const struct peer * p;
428
  int i;
429

    
430
  psample_parse_data(t->tc,NULL,0); // needed in order to trigger timed sending of TOPO messages
431

    
432
        update_metadata(t);
433
        topology_sample_peers(t);
434

    
435
  if timerisset(&(t->tout_bmap) )
436
                neighbourhood_drop_unactives(t, &(t->tout_bmap));
437

    
438
        old_neighs = peerset_create_reference_copy(t->neighbourhood);
439

    
440
    topology_update_random(t);
441

    
442
  topology_signal_change(t, old_neighs);
443
        peerset_destroy_reference_copy(&old_neighs);
444

    
445
    peerset_for_each(t->swarm_bucket,p,i)
446
      peerset_pop_peer(t->locked_neighs,p->id);
447
    peerset_clear(t->swarm_bucket,0);  // we don't remember past peers
448
}
449

    
450
void topology_destroy(struct topology **t)
451
{
452
        if (t && *t)
453
        {
454
                if(((*t)->locked_neighs))
455
                        peerset_destroy_reference_copy(&((*t)->locked_neighs));
456
                if(((*t)->neighbourhood))
457
                        peerset_destroy(&((*t)->neighbourhood));
458
                if(((*t)->swarm_bucket))
459
                        peerset_destroy(&((*t)->swarm_bucket));
460
                if(((*t)->tc))
461
                        psample_destroy(&((*t)->tc));
462
                free(*t);
463
        }
464
}
465

    
466
uint8_t topology_peer_cbsize(const struct topology *t, const struct peer * p)
467
{
468
}