Statistics
| Branch: | Revision:

streamers / topology.c @ 375b4494

History | View | Annotate | Download (21.4 KB)

1 7f591208 Csaba Kiraly
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *  Copyright (c) 2010 Luca Abeni
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7 2f846ddd Csaba Kiraly
#include <stdint.h>
8 fcb5c29b Csaba Kiraly
#include <stdio.h>
9 2f846ddd Csaba Kiraly
#include <sys/time.h>
10
#include <time.h>
11 7e39164f MarcoBiazzini
#include <stdlib.h>
12 c2d8fe2d Csaba Kiraly
#include <string.h>
13 2f846ddd Csaba Kiraly
14 7e39164f MarcoBiazzini
#include <math.h>
15 74a5d4ae CsabaKiraly
#include <net_helper.h>
16 2f846ddd Csaba Kiraly
#include <peerset.h>
17
#include <peer.h>
18 33d23b91 MarcoBiazzini
#include <grapes_msg_types.h>
19 2f846ddd Csaba Kiraly
#include <topmanager.h>
20 7e39164f MarcoBiazzini
#include <tman.h>
21 2f846ddd Csaba Kiraly
22 63ebb93d CsabaKiraly
#include "compatibility/timer.h"
23
24 2f846ddd Csaba Kiraly
#include "topology.h"
25 16ae4927 Csaba Kiraly
#include "nodeid_set.h"
26 132f8b40 Csaba Kiraly
#include "streaming.h"
27 4729fe2d Csaba Kiraly
#include "dbg.h"
28 4bf91643 Csaba Kiraly
#include "measures.h"
29 0877a9f6 Csaba Kiraly
#include "streamer.h"
30 3e8cd067 Stefano Traverso
#include "hrc.h"
31 a3b74b71 Stefano Traverso
#include "blacklist.h"
32 4bf91643 Csaba Kiraly
33 a4933dfb Csaba Kiraly
#define MIN(A,B) (((A) < (B)) ? (A) : (B))
34 b4419dd1 Csaba Kiraly
#define MAX(A,B) (((A) > (B)) ? (A) : (B))
35 31ba5789 Csaba Kiraly
36 e30c3e36 Stefano Traverso
#define UPDATE_PEERS_TIMEOUT 5
37
38 9818ca86 Csaba Kiraly
double desired_bw = 0;
39 970bd24a napawine
double desired_rtt = 0.2;
40 acbc9155 Csaba Kiraly
double alpha_target = 0.5;
41 6f490e6a Csaba Kiraly
double topo_mem = 0;
42 acbc9155 Csaba Kiraly
43 483dec26 Csaba Kiraly
bool topo_out = true; //peer selects out-neighbours
44 e5cd99c1 Csaba Kiraly
bool topo_in = false; //peer selects in-neighbours (combined means bidirectional)
45 483dec26 Csaba Kiraly
46 f6e9f400 Csaba Kiraly
bool topo_keep_best = false;
47
bool topo_add_best = false;
48 78449db6 Stefano Traverso
bool topo_black_list = false;
49
50 fe769b0b Stefano Traverso
const char* topo_keep_best_policy = "";
51
const char* topo_add_best_policy = "";
52
const char* topo_black_list_policy = "";
53
54 5f483e66 Csaba Kiraly
int NEIGHBORHOOD_TARGET_SIZE = 30;
55 01a25012 MarcoBiazzini
#define TMAN_MAX_IDLE 10
56 821cc939 CsabaKiraly
#define TMAN_LOG_EVERY 1000
57 d78e9c6a Csaba Kiraly
58 9176d3d1 Csaba Kiraly
#define STREAMER_TOPOLOGY_MSG_ADD 0
59
#define STREAMER_TOPOLOGY_MSG_REMOVE 1
60
61 fcb5c29b Csaba Kiraly
static struct peerset *pset;
62 9b36d077 Csaba Kiraly
static struct timeval tout_bmap = {20, 0};
63 7e39164f MarcoBiazzini
static int counter = 0;
64 a3b74b71 Stefano Traverso
static int counter2 = 0;
65 7e39164f MarcoBiazzini
static int simpleRanker (const void *tin, const void *p1in, const void *p2in);
66
static tmanRankingFunction rankFunct = simpleRanker;
67 e4247dda Csaba Kiraly
68 cc0cd459 Csaba Kiraly
struct metadata {
69 d2514625 Csaba Kiraly
  uint16_t cb_size;
70 9d55b3b7 Csaba Kiraly
  uint16_t cps;
71 b1d41fdb Csaba Kiraly
  float capacity;
72 5be2e90d Csaba Kiraly
  float recv_delay;
73 e4247dda Csaba Kiraly
} __attribute__((packed));
74
75 cc0cd459 Csaba Kiraly
static struct metadata my_metadata;
76 7e39164f MarcoBiazzini
static int cnt = 0;
77
static struct nodeID *me = NULL;
78 9176d3d1 Csaba Kiraly
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY,MSG_TYPE_TMAN,MSG_TYPE_STREAMER_TOPOLOGY};
79 9ffd84bb MarcoBiazzini
static struct nodeID ** neighbors;
80 e30c3e36 Stefano Traverso
static struct timeval last_time_updated_peers;
81 7e39164f MarcoBiazzini
82
static void update_metadata(void) {
83 b88c9f4a Csaba Kiraly
        my_metadata.cb_size = am_i_source() ? 0 : get_cb_size();
84 5be2e90d Csaba Kiraly
        my_metadata.recv_delay = get_receive_delay();
85 3e8cd067 Stefano Traverso
        my_metadata.cps =  get_running_offer_threads();
86 b1d41fdb Csaba Kiraly
        my_metadata.capacity = get_capacity();
87 7e39164f MarcoBiazzini
}
88
89
static int simpleRanker (const void *tin, const void *p1in, const void *p2in) {
90
91
        double t,p1,p2;
92
        t = *((const double *)tin);
93
        p1 = *((const double *)p1in);
94
        p2 = *((const double *)p2in);
95
96
        if (isnan(t) || (isnan(p1) && isnan(p2))) return 0;
97
        else if (isnan(p1)) return 2;
98
        else if (isnan(p2)) return 1;
99
        else return (fabs(t-p1) == fabs(t-p2))?0:(fabs(t-p1) < fabs(t-p2))?1:2;
100
101
}
102
103
int topologyInit(struct nodeID *myID, const char *config)
104
{
105 33d23b91 MarcoBiazzini
        int i;
106 9f1da074 Csaba Kiraly
        for (i=0;i<sizeof(mTypes)/sizeof(mTypes[0]);i++)
107 33d23b91 MarcoBiazzini
                bind_msg_type(mTypes[i]);
108 7e39164f MarcoBiazzini
        update_metadata();
109
        me = myID;
110 e30c3e36 Stefano Traverso
        gettimeofday(&last_time_updated_peers, NULL);
111 7e39164f MarcoBiazzini
        return (topInit(myID, &my_metadata, sizeof(my_metadata), config) && tmanInit(myID,&my_metadata, sizeof(my_metadata),rankFunct,0));
112
}
113
114
void topologyShutdown(void)
115
{
116
}
117
118
int topoAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
119
{
120
        // TODO: check this!! Just to use this function to bootstrap ncast...
121 cc0cd459 Csaba Kiraly
        struct metadata m = {0};        //TODO: check what metadata option should mean
122
123 7e39164f MarcoBiazzini
        if (counter < TMAN_MAX_IDLE)
124 cc0cd459 Csaba Kiraly
                return topAddNeighbour(neighbour,&m,sizeof(m));
125
        else return tmanAddNeighbour(neighbour,&m,sizeof(m));
126 7e39164f MarcoBiazzini
}
127
128
static int topoParseData(const uint8_t *buff, int len)
129
{
130
        int res = -1,ncs = 0,msize;
131
        const struct nodeID **n; const void *m;
132
        if (!buff || buff[0] == MSG_TYPE_TOPOLOGY) {
133
                res = topParseData(buff,len);
134 c65ac153 Csaba Kiraly
//                if (counter <= TMAN_MAX_IDLE)
135
//                        counter++;
136 7e39164f MarcoBiazzini
        }
137
        if (counter >= TMAN_MAX_IDLE && (!buff || buff[0] == MSG_TYPE_TMAN))
138
        {
139
                n = topGetNeighbourhood(&ncs);
140
                if (ncs) {
141
                m = topGetMetadata(&msize);
142
                res = tmanParseData(buff,len,n,ncs,m,msize);
143
                }
144
        }
145
  return res;
146
}
147
148
static const struct nodeID **topoGetNeighbourhood(int *n)
149
{
150
        int i; double d;
151
        if (counter > TMAN_MAX_IDLE) {
152 9ffd84bb MarcoBiazzini
                uint8_t *mdata; int msize;
153 7e39164f MarcoBiazzini
                *n = tmanGetNeighbourhoodSize();
154 9ffd84bb MarcoBiazzini
                if (neighbors) free(neighbors);
155 7e39164f MarcoBiazzini
                neighbors = calloc(*n,sizeof(struct nodeID *));
156
                tmanGetMetadata(&msize);
157
                mdata = calloc(*n,msize);
158
                tmanGivePeers(*n,neighbors,(void *)mdata);
159
160 0711317b CsabaKiraly
                if (cnt % TMAN_LOG_EVERY == 0) {
161 5be2e90d Csaba Kiraly
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_chunk_delay,%f\n",node_addr(me),node_addr(me),my_metadata.recv_delay);
162 7e39164f MarcoBiazzini
                        for (i=0;i<(*n) && i<NEIGHBORHOOD_TARGET_SIZE;i++) {
163
                                d = *((double *)(mdata+i*msize));
164
                                fprintf(stderr,"abouttopublish,%s,",node_addr(me));
165
                                fprintf(stderr,"%s,,Tman_chunk_delay,%f\n",node_addr(neighbors[i]),d);
166
                        }
167
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_neighborhood_size,%d\n\n",node_addr(me),node_addr(me),*n);
168
                }
169
170
                free(mdata);
171
                return (const struct nodeID **)neighbors;
172
        }
173
        else
174
                return topGetNeighbourhood(n);
175
}
176 2f846ddd Csaba Kiraly
177 685c4dd0 MarcoBiazzini
static void topoAddToBL (struct nodeID *id)
178
{
179
        if (counter >= TMAN_MAX_IDLE)
180
                tmanAddToBlackList(id);
181 a3b74b71 Stefano Traverso
        topAddToBlackList(id);
182 685c4dd0 MarcoBiazzini
}
183 fcb5c29b Csaba Kiraly
184 9176d3d1 Csaba Kiraly
//TODO: send metadata as well
185
static int send_topo_msg(struct nodeID *dst, uint8_t type)
186
{
187
  char msg[2];
188
  msg[0] = MSG_TYPE_STREAMER_TOPOLOGY;
189
  msg[1] = type;
190
  return send_to_peer(me, dst, msg, 2);
191
}
192
193 2dab7813 Csaba Kiraly
static void add_peer(const struct nodeID *id, const struct metadata *m, bool local, bool remote)
194 fcb5c29b Csaba Kiraly
{
195 2dab7813 Csaba Kiraly
  if (local) {
196 b88c9f4a Csaba Kiraly
      dprintf("Adding %s to neighbourhood! cb_size:%d\n", node_addr(id), m?m->cb_size:-1);
197 fcb5c29b Csaba Kiraly
      peerset_add_peer(pset, id);
198 b88c9f4a Csaba Kiraly
      if (m) peerset_get_peer(pset, id)->cb_size = m->cb_size;
199 b1d41fdb Csaba Kiraly
      if (m) peerset_get_peer(pset, id)->capacity = m->capacity;
200 4bf91643 Csaba Kiraly
      /* add measures here */
201
      add_measures(id);
202 b0225995 Csaba Kiraly
      send_bmap(id);
203 2dab7813 Csaba Kiraly
  }
204
  if (remote) {
205 9176d3d1 Csaba Kiraly
      dtprintf("Topo: explicit topo message sent!!! to %s (peers:%d)\n", node_addr(id));
206
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_ADD);
207 2dab7813 Csaba Kiraly
  }
208 9176d3d1 Csaba Kiraly
}
209
210 2dab7813 Csaba Kiraly
static void remove_peer(const struct nodeID *id, bool local, bool remote)
211 fcb5c29b Csaba Kiraly
{
212 2dab7813 Csaba Kiraly
  if (local) {
213 fcb5c29b Csaba Kiraly
      dprintf("Removing %s from neighbourhood!\n", node_addr(id));
214 a3b74b71 Stefano Traverso
      /* delete measures here */
215 4bf91643 Csaba Kiraly
      delete_measures(id);
216 fcb5c29b Csaba Kiraly
      peerset_remove_peer(pset, id);
217 2dab7813 Csaba Kiraly
  }
218
  if (remote) {
219 9176d3d1 Csaba Kiraly
      dtprintf("Topo: explicit topo message sent!!! to %s (peers:%d)\n", node_addr(id));
220
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_REMOVE);
221 2dab7813 Csaba Kiraly
  }
222 9176d3d1 Csaba Kiraly
}
223
224 a26a0456 Csaba Kiraly
//get the rtt. Currenly only MONL version is supported
225 89ccef52 Csaba Kiraly
static double get_rtt_of(const struct nodeID* n){
226 acbc9155 Csaba Kiraly
#ifdef MONL
227 896704e9 Csaba Kiraly
  return get_rtt(n);
228 acbc9155 Csaba Kiraly
#else
229 a26a0456 Csaba Kiraly
  return NAN;
230 acbc9155 Csaba Kiraly
#endif
231
}
232
233 9818ca86 Csaba Kiraly
//get the declared capacity of a node
234
static double get_capacity_of(const struct nodeID* n){
235
  struct peer *p = peerset_get_peer(pset, n);
236
  if (p) {
237
    return p->capacity;
238
  }
239
240
  return NAN;
241
}
242
243 fe769b0b Stefano Traverso
244 375b4494 Stefano Traverso
// FF//get the declared capacity of a node
245
// static double get_offers_threads_of(const struct nodeID* n){
246
//   struct peer *p = peerset_get_peer(pset, n);
247
//   if (p) {
248
//     return p->offers_threads;
249
//   }
250
// 
251
//   return NAN;
252
// }
253
254
255 fe769b0b Stefano Traverso
static double get_rx_bytes_chunks_of(const struct nodeID* n){
256
#ifdef MONL
257
  return get_rx_bytes_chunks(n);
258
#else
259
  return NAN;
260
#endif
261
}
262
263
static double get_transmitter_lossrate_of(const struct nodeID* n){
264
#ifdef MONL
265
  return get_transmitter_lossrate_of(n);
266
#else
267
  return NAN;
268
#endif
269
}
270
271 a26a0456 Csaba Kiraly
//returns: 1:yes 0:no -1:unknown
272 89ccef52 Csaba Kiraly
int desiredness(const struct nodeID* n) {
273 a26a0456 Csaba Kiraly
  double rtt = get_rtt_of(n);
274 9818ca86 Csaba Kiraly
  double bw =  get_capacity_of(n);
275
276
  if ((isnan(rtt) && finite(desired_rtt)) || (isnan(bw) && desired_bw > 0)) {
277
    return -1;
278
  } else if ((isnan(rtt) || rtt <= desired_rtt) && (isnan(bw) || bw >= desired_bw)) {
279
    return 1;
280
  }
281 a26a0456 Csaba Kiraly
282 9818ca86 Csaba Kiraly
  return 0;
283 a26a0456 Csaba Kiraly
}
284
285 a3b74b71 Stefano Traverso
// equal to desiredness, but considers blacklist
286
//returns: 1:yes 0:no -1:unknown
287
int desiredness_unbled(const struct nodeID* n) {
288
  double rtt = get_rtt_of(n);
289
  double bw =  get_capacity_of(n);
290
  bool bled = black_listed(node_addr(n));
291
292
  if ((isnan(rtt) && finite(desired_rtt)) || (isnan(bw) && desired_bw > 0)) {
293
    return -1;
294
  } else if ((isnan(rtt) || rtt <= desired_rtt) && (isnan(bw) || bw >= desired_bw) && (!bled)) {
295
    return 1;
296
  }
297
298
  return 0;
299
}
300
301
//returns: 1:yes 0:no -1:unknown
302 fe769b0b Stefano Traverso
int blacklistness_lossrate(const struct nodeID* n) {
303 e30c3e36 Stefano Traverso
  double loss_rate = get_transmitter_lossrate(n);
304
  
305
  fprintf(stderr, "BLACKLIST: father loss %s rate is %f\n", node_addr(n), loss_rate);
306 a3b74b71 Stefano Traverso
  
307
  if (isnan(loss_rate)) {
308
    return -1;
309
  } else if (loss_rate > 0.10) {
310
    return 1;
311
  }
312
313
  return 0;
314
}
315
316 89ccef52 Csaba Kiraly
bool is_desired(const struct nodeID* n) {
317 a9e55854 Csaba Kiraly
  return (desiredness(n) == 1);
318
}
319
320 a3b74b71 Stefano Traverso
bool is_desired_unbled(const struct nodeID* n) {
321
  return (desiredness_unbled(n) == 1);
322
}
323
324
bool is_to_blacklist(const struct nodeID* n) {
325 fe769b0b Stefano Traverso
  return (blacklistness_lossrate(n) == 1);
326 a3b74b71 Stefano Traverso
}
327
328 fe769b0b Stefano Traverso
int cmp_rtt(const struct nodeID** a, const struct nodeID** b) {
329 f6e9f400 Csaba Kiraly
  double ra, rb;
330 fe769b0b Stefano Traverso
  ra = get_rtt_of(*a);
331
  rb = get_rtt_of(*b);
332
  fprintf(stderr, "RTTS: %s %f %s %f\n", node_addr(*a), ra, node_addr(*b), rb );
333 f6e9f400 Csaba Kiraly
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
334
  else if (isnan(rb) || ra < rb) return -1;
335
  else return 1;
336
}
337
338
int vcmp_rtt(const void* a, const void* b) {
339 fe769b0b Stefano Traverso
  return cmp_rtt((const struct nodeID**) a, (const struct nodeID**) b);
340
}
341
342
int cmp_bw(const struct nodeID** a, const struct nodeID** b) {
343
  double ra, rb;
344
  ra = -( get_capacity_of(*a) );
345
  rb = -( get_capacity_of(*b) );
346
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
347
  else if (isnan(rb) || ra < rb) return -1;
348
  else return 1;
349
}
350
351
int vcmp_bw(const void* a, const void* b) {
352
  return cmp_bw((const struct nodeID**)a, (const struct nodeID**)b);
353
}
354
355
int cmp_rtt_bw(const struct nodeID** a, const struct nodeID** b) {
356
  double ra, rb;
357
  ra = -( get_capacity_of(*a)*get_rtt_of(*a) );
358
  rb = -( get_capacity_of(*b)*get_rtt_of(*b) );
359
  if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
360
  else if (isnan(rb) || ra < rb) return -1;
361
  else return 1;
362
}
363
364
int vcmp_rtt_bw(const void* a, const void* b) {
365
  return cmp_rtt_bw((const struct nodeID**)a, (const struct nodeID**)b);
366
}
367
368
// int cmp_offerratio_hopcount_queuedelay(const struct nodeID* a, const struct nodeID* b) {
369
//   double ra, rb;
370
//   ra = get_capacity_of(a)*get_rtt_of(a);
371
//   rb = get_capacity_of(b)*get_rtt_of(b);
372
//   if ((isnan(ra) && isnan(rb)) || ra == rb) return 0;
373
//   else if (isnan(rb) || ra < rb) return -1;
374
//   else return 1;
375
// }
376
377
// int vcmp_offerratio_hopcount_queuedelay(const void* a, const void* b) {
378
//   return cmp_rtt((const struct nodeID*)a, (const struct nodeID*)b);
379
// }
380
381
int cmp_rx_bytes_chunks(const struct nodeID** a, const struct nodeID** b) {
382
  double ra, rb;
383
  ra = get_rx_bytes_chunks_of(*a);
384
  rb = get_rx_bytes_chunks_of(*b);
385
//   fprintf(stderr, "RX_BYTES_CHUNKS: %s %f %s %f\n", node_addr(*a), ra, node_addr(*b), rb );
386
  if ((isnan(ra) && isnan(rb)) || ra == rb) 
387
        return 0;
388
  else if (isnan(rb) || ra < rb) 
389
        return -1;
390
  else 
391
        return 1;
392
}
393
394
int vcmp_rx_bytes_chunks(const void* a, const void* b) {
395
  return cmp_rx_bytes_chunks((const struct nodeID**) a, (const struct nodeID**) b);
396
}
397
398
int cmp_packet_loss(const struct nodeID** a, const struct nodeID** b) {
399
  double ra, rb;
400
  ra = -( get_transmitter_lossrate(*a) );
401
  rb = -( get_transmitter_lossrate(*b) );
402
//   fprintf(stderr, "PACKET_LOSS: %s %f %s %f\n", node_addr(*a), ra, node_addr(*b), rb );
403
  if ((isnan(ra) && isnan(rb)) || ra == rb) 
404
        return 0;
405
  else if (isnan(rb) || ra < rb) 
406
        return -1;
407
  else 
408
        return 1;
409
}
410
411
int vcmp_packet_loss(const void* a, const void* b) {
412
  return cmp_packet_loss((const struct nodeID**) a, (const struct nodeID**) b);
413
}
414
415
// Use add best policy specified at command line
416
int vcmp_add_best(const void* a, const void* b) {
417
  if (strcmp(topo_add_best_policy, "BW") == 0) {
418
          return vcmp_bw(a, b);
419
  } else if (strcmp(topo_add_best_policy, "RTT") == 0) {
420
          return vcmp_rtt( a,  b);
421
  } else if (strcmp(topo_add_best_policy, "BW_RTT") == 0) {
422
          return vcmp_rtt_bw( a,  b);
423
  } else if (strcmp(topo_add_best_policy, "") == 0){ // defualt behaviuor
424
          return vcmp_rtt_bw( a, b);
425
  } else {
426
    fprintf(stderr, "Error: unknown add best policy %s\n", topo_add_best_policy);
427
    return 1;
428
  }  
429
}
430
431
// Use keep best policy specified at command line
432
int vcmp_keep_best(const void* a, const void* b) {
433
  if (strcmp(topo_keep_best_policy, "RX_CHUNKS") == 0) {
434
          return vcmp_rx_bytes_chunks(a, b);
435
  } else if (strcmp(topo_keep_best_policy, "PACKET_LOSS") == 0) {
436
          return vcmp_packet_loss( a, b);
437
  } else if (strcmp(topo_keep_best_policy, "") == 0) { // Default behaviuor
438
          return vcmp_rx_bytes_chunks( a, b);
439
  } else {
440
    fprintf(stderr, "Error: unknown keep best policy %s\n", topo_add_best_policy);
441
    return 1;
442
  }  
443 f6e9f400 Csaba Kiraly
}
444
445 2f846ddd Csaba Kiraly
// currently it just makes the peerset grow
446 74a5d4ae CsabaKiraly
void update_peers(struct nodeID *from, const uint8_t *buff, int len)
447 2f846ddd Csaba Kiraly
{
448 6b2924d4 Csaba Kiraly
  int n_ids, metasize, i, newids_size, max_ids;
449
  static const struct nodeID **newids;
450 cc0cd459 Csaba Kiraly
  static const struct metadata *metas;
451 2f846ddd Csaba Kiraly
  struct peer *peers;
452
  struct timeval tnow, told;
453 304607e2 Csaba Kiraly
  static const struct nodeID **savedids;
454
  static int savedids_size;
455 2f846ddd Csaba Kiraly
456 eac0d4ce Csaba Kiraly
  if timerisset(&tout_bmap) {
457
    gettimeofday(&tnow, NULL);
458
    timersub(&tnow, &tout_bmap, &told);
459
    peers = peerset_get_peers(pset);
460
    for (i = 0; i < peerset_size(pset); i++) {
461
      if ( (!timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].creation_timestamp, &told, <) ) ||
462
           ( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <)     )   ) {
463 d84d066d Csaba Kiraly
        ftprintf(stderr,"Topo: dropping inactive %s (peers:%d)\n", node_addr(peers[i].id), peerset_size(pset));
464 eac0d4ce Csaba Kiraly
        //if (peerset_size(pset) > 1) {        // avoid dropping our last link to the world
465
        topoAddToBL(peers[i].id);
466 2dab7813 Csaba Kiraly
        remove_peer(peers[i--].id, true, true);
467 eac0d4ce Csaba Kiraly
        //}
468
      }
469
    }
470
  }
471
472 8a49328f CsabaKiraly
  if (cnt++ % 100 == 0) {
473 7e39164f MarcoBiazzini
        update_metadata();
474 59ad7d83 Csaba Kiraly
    if (counter > TMAN_MAX_IDLE) {
475 7e39164f MarcoBiazzini
        tmanChangeMetadata(&my_metadata,sizeof(my_metadata));
476 59ad7d83 Csaba Kiraly
    }
477 7e39164f MarcoBiazzini
  }
478
479 9176d3d1 Csaba Kiraly
  //handle explicit add/remove messages
480 483dec26 Csaba Kiraly
  if (topo_in && buff && buff[0] == MSG_TYPE_STREAMER_TOPOLOGY) {
481 9176d3d1 Csaba Kiraly
    dtprintf(stderr,"Topo: explicit topo message received!!!from %s (peers:%d)\n", node_addr(from), peerset_size(pset));
482
    if (len != 2) {
483
      fprintf(stderr, "Bad streamer topo message received, len:%d!\n", len);
484
      return;
485
    }
486
    switch (buff[1]) {
487
      case STREAMER_TOPOLOGY_MSG_ADD:
488 65a62f30 Csaba Kiraly
        ftprintf(stderr,"Topo: adding on request %s (peers:%d)\n", node_addr(from), peerset_size(pset));
489 a3b74b71 Stefano Traverso
        if (!peerset_get_peer(pset, from)) {
490
                counter2++;
491 4b40812a Csaba Kiraly
                add_peer(from, NULL, true, false);
492 a3b74b71 Stefano Traverso
                }
493
        if (black_listed(node_addr(from)) && (peerset_get_peer(pset, from)))
494
                remove_peer(from, topo_in, topo_out);
495
        else if ((black_listed(node_addr(from))) && (peerset_get_peer(pset, from) == NULL))
496
                send_topo_msg(from, STREAMER_TOPOLOGY_MSG_REMOVE);
497 9176d3d1 Csaba Kiraly
        break;
498
      case STREAMER_TOPOLOGY_MSG_REMOVE:
499 a3b74b71 Stefano Traverso
        if (peerset_get_peer(pset, from) != NULL) {
500
                ftprintf(stderr,"Topo: removing on request %s (peers:%d)\n", node_addr(from), peerset_size(pset));
501
                remove_peer(from, true, false);
502
                }
503 9176d3d1 Csaba Kiraly
        break;
504
      default:
505
        fprintf(stderr, "Bad streamer topo message received!\n");
506
    }
507
    reg_neigh_size(peerset_size(pset));
508
    return;
509
  }
510
511 e30c3e36 Stefano Traverso
  int res_topo_parse_data = topoParseData(buff, len);
512 de351684 Csaba Kiraly
513 e30c3e36 Stefano Traverso
  // Exit if a gossiping message has been received TODO: increase newscast cache to 100
514
  if (res_topo_parse_data > 0) {
515 6828703c Csaba Kiraly
    reg_neigh_size(peerset_size(pset));
516
    return;
517
  }
518 de351684 Csaba Kiraly
519 e30c3e36 Stefano Traverso
  // Look for peers which can be removed from blacklist
520 a3b74b71 Stefano Traverso
  check_black_list_timeouts();
521 e30c3e36 Stefano Traverso
522 fe769b0b Stefano Traverso
  double time_difference = (tnow.tv_sec + tnow.tv_usec*1e-6) - (last_time_updated_peers.tv_sec + last_time_updated_peers.tv_usec*1e-6) + rand() - 2 * rand();
523 e30c3e36 Stefano Traverso
524
  // Check if it's time to update the neighbourhood
525
  if (time_difference > UPDATE_PEERS_TIMEOUT) {
526
    peers = peerset_get_peers(pset);
527
    n_ids = peerset_size(pset);
528
    
529
    newids = topoGetNeighbourhood(&newids_size);    //TODO handle both tman and topo
530
    metas = topGetMetadata(&metasize);    //TODO: check metasize
531
    max_ids = n_ids + savedids_size + newids_size;
532
    ftprintf(stderr,"Topo modify start peers:%d candidates:%d\n", n_ids, newids_size);
533
534 d8a205cd Csaba Kiraly
    int desired_part;
535 6f490e6a Csaba Kiraly
    const struct nodeID *oldids[max_ids], *nodeids[max_ids], *candidates[max_ids], *desireds[max_ids], *selecteds[max_ids], *others[max_ids], *toadds[max_ids], *toremoves[max_ids];
536 dd2e7bcf Csaba Kiraly
    int oldids_size, nodeids_size, candidates_size, desireds_size, selecteds_size, others_size, toadds_size, toremoves_size, keep_size, random_size;
537 6f490e6a Csaba Kiraly
    nodeids_size = candidates_size = desireds_size = selecteds_size = others_size = toadds_size = toremoves_size = max_ids;
538 acbc9155 Csaba Kiraly
539 79c926d0 Csaba Kiraly
    if (topo_out) {
540
      for (i = 0, oldids_size = 0; i < peerset_size(pset); i++) {
541
        oldids[oldids_size++] = peers[i].id;
542 e30c3e36 Stefano Traverso
        fprintf(stderr," %s - RTT: %f, loss_rate %f\n", node_addr(peers[i].id) , get_rtt_of(peers[i].id), get_transmitter_lossrate(peers[i].id));
543 79c926d0 Csaba Kiraly
      }
544
    } else {
545
      for (i = 0, oldids_size = 0; i < savedids_size; i++) {
546
        oldids[oldids_size++] = savedids[i];
547 fe769b0b Stefano Traverso
        fprintf(stderr," %s - RTT: %f, RX_CHUNKS: %f\n", node_addr(oldids[i]) , get_rtt_of(oldids[i]), get_rx_bytes_chunks(oldids[i]));
548 79c926d0 Csaba Kiraly
      }
549
      savedids_size = 0;
550
      free(savedids);
551 acbc9155 Csaba Kiraly
    }
552 d8a205cd Csaba Kiraly
553 6f490e6a Csaba Kiraly
    // select the topo_mem portion of peers to be kept (uniform random)
554 f6e9f400 Csaba Kiraly
    if (topo_keep_best) {
555 fe769b0b Stefano Traverso
      qsort(oldids, oldids_size, sizeof(oldids[0]), vcmp_keep_best);
556 f6e9f400 Csaba Kiraly
    } else {
557
      nidset_shuffle(oldids, oldids_size);
558
    }
559 fe769b0b Stefano Traverso
    for (i = 0; i < oldids_size; i++) {
560
        fprintf(stderr,"QSORT KEEP BEST %s - RTT: %f, RX_CHUNKS %f, PACKET_LOSS %f\n", node_addr(oldids[i]) , get_rtt_of(oldids[i]), get_rx_bytes_chunks(oldids[i]), get_transmitter_lossrate(oldids[i]));
561
      }
562
    fprintf(stderr, "QSORT KEEP BEST END\n");
563
    
564 6f490e6a Csaba Kiraly
    keep_size = selecteds_size = (int) (topo_mem * oldids_size);
565
    memcpy(selecteds, oldids, selecteds_size * sizeof(selecteds[0]));
566 6b2924d4 Csaba Kiraly
567 6f490e6a Csaba Kiraly
    // compose list of known nodeids
568 6b2924d4 Csaba Kiraly
    nidset_add(nodeids, &nodeids_size, oldids, oldids_size, newids, newids_size);
569 a3b74b71 Stefano Traverso
  
570 6f490e6a Csaba Kiraly
    // compose list of candidate nodeids
571
    nidset_complement(candidates, &candidates_size, nodeids, nodeids_size, selecteds, selecteds_size);
572
573 d8a205cd Csaba Kiraly
    // select the alpha_target portion of desired peers
574 6f490e6a Csaba Kiraly
    desired_part = (1 - alpha_target) * (NEIGHBORHOOD_TARGET_SIZE ? (MAX (NEIGHBORHOOD_TARGET_SIZE - selecteds_size, 0)) : 0);
575 a3b74b71 Stefano Traverso
    
576
    // Filter out blacklisted ones    
577
    nidset_filter(desireds, &desireds_size, candidates, candidates_size, is_desired_unbled);
578
    
579 f6e9f400 Csaba Kiraly
    if (topo_add_best) {
580 fe769b0b Stefano Traverso
      qsort(desireds, desireds_size, sizeof(desireds[0]), vcmp_add_best);
581 f6e9f400 Csaba Kiraly
    } else {
582
      nidset_shuffle(desireds, desireds_size);
583
    }
584 fe769b0b Stefano Traverso
    for (i = 0; i < desireds_size; i++) {
585 375b4494 Stefano Traverso
        fprintf(stderr,"QSORT ADD BEST %s - RTT: %f, BW %f, BW*RTT %f\n", node_addr(desireds[i]) , get_rtt_of(desireds[i]), get_capacity_of(desireds[i]), (double) (get_capacity_of(desireds[i])*get_rtt_of(desireds[i])) );
586 fe769b0b Stefano Traverso
      }
587
    fprintf(stderr, "QSORT ADD BEST END\n");
588 6f490e6a Csaba Kiraly
    nidset_add_i(selecteds, &selecteds_size, max_ids, desireds, MIN(desireds_size,desired_part));
589 d8a205cd Csaba Kiraly
590
    // random from the rest
591 6f490e6a Csaba Kiraly
    nidset_complement(others, &others_size, candidates, candidates_size, selecteds, selecteds_size);
592 d8a205cd Csaba Kiraly
    nidset_shuffle(others, others_size);
593 69dfa8b1 Csaba Kiraly
    random_size = NEIGHBORHOOD_TARGET_SIZE ? MIN(others_size, MAX(NEIGHBORHOOD_TARGET_SIZE - selecteds_size, 0)) : others_size;
594 3131d7d1 Csaba Kiraly
    nidset_add_i(selecteds, &selecteds_size, max_ids, others, random_size);
595
596 dd2e7bcf Csaba Kiraly
    fprintf(stderr,"Topo modify sel:%d (from:%d) = keep: %d (of old:%d) + desired: %d (from %d of %d; target:%d) + random: %d (from %d)\n",
597 e30c3e36 Stefano Traverso
        selecteds_size, nodeids_size,
598
        keep_size, oldids_size,
599
        MIN(desireds_size,desired_part), desireds_size, candidates_size, desired_part,
600
        random_size, others_size);
601 6b2924d4 Csaba Kiraly
    // add new ones
602
    nidset_complement(toadds, &toadds_size, selecteds, selecteds_size, oldids, oldids_size);
603
    for (i = 0; i < toadds_size; i++) {
604 dd2e7bcf Csaba Kiraly
      int j;
605 6b2924d4 Csaba Kiraly
      //searching for the metadata
606
      if (nidset_find(&j, newids, newids_size, toadds[i])) {
607
        fprintf(stderr," adding %s\n", node_addr(toadds[i]));
608 e30c3e36 Stefano Traverso
        counter2++;
609 483dec26 Csaba Kiraly
        add_peer(newids[j], &metas[j], topo_out, topo_in);
610 6b2924d4 Csaba Kiraly
      } else {
611 48a8f955 Csaba Kiraly
        fprintf(stderr," Error: missing metadata for %s\n", node_addr(toadds[i]));
612 6b2924d4 Csaba Kiraly
      }
613
    }
614
615 d8a205cd Csaba Kiraly
    // finally, remove those not needed
616
    fprintf(stderr,"Topo remove start (peers:%d)\n", n_ids);
617 a3b74b71 Stefano Traverso
        
618 f6d28931 Csaba Kiraly
    nidset_complement(toremoves, &toremoves_size, oldids, oldids_size, selecteds, selecteds_size);
619 d8a205cd Csaba Kiraly
    for (i = 0; i < toremoves_size; i++) {
620 e30c3e36 Stefano Traverso
       // Check that it has not been removed already
621
       if (peerset_get_peer(pset, toremoves[i]) != NULL ) {
622
       // Counter2 is used to inhibit blacklisting at startup. TEST: maybe useless
623
       if (is_to_blacklist(toremoves[i]) && !black_listed(node_addr(toremoves[i])) && topo_black_list && counter2 > (NEIGHBORHOOD_TARGET_SIZE*2)) {
624
          fprintf(stderr," blacklisting and removing %s\n", node_addr(toremoves[i]));
625
          add_to_blacklist(node_addr(toremoves[i]));
626
          }
627
       fprintf(stderr," removing %s\n", node_addr(toremoves[i]));
628
       if (peerset_get_peer(pset, toremoves[i]) != NULL) {
629
          remove_peer(toremoves[i], topo_out, topo_in);
630
          }
631
       }
632 acbc9155 Csaba Kiraly
    }
633 d8a205cd Csaba Kiraly
    fprintf(stderr,"Topo remove end\n");
634 79c926d0 Csaba Kiraly
635
    if (!topo_out) {
636 e30c3e36 Stefano Traverso
      savedids = malloc(selecteds_size * sizeof(savedids[0])); //TODO: handle errors
637 79c926d0 Csaba Kiraly
      for (i = 0, savedids_size = 0; i < selecteds_size; i++) {
638
        savedids[savedids_size++] = nodeid_dup(selecteds[i]);
639
      }
640
      for (i = 0; i < oldids_size; i++) {
641
        nodeid_free(oldids[i]);
642
      }
643
    }
644 e30c3e36 Stefano Traverso
    gettimeofday(&last_time_updated_peers, NULL); 
645
  } 
646 e4ef30e3 Csaba Kiraly
  reg_neigh_size(peerset_size(pset));
647 2f846ddd Csaba Kiraly
}
648 fcb5c29b Csaba Kiraly
649
struct peer *nodeid_to_peer(const struct nodeID* id, int reg)
650
{
651
  struct peer *p = peerset_get_peer(pset, id);
652
  if (!p) {
653 cb6be841 Csaba Kiraly
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
654 fcb5c29b Csaba Kiraly
    if (reg) {
655 483dec26 Csaba Kiraly
      add_peer(id,NULL, topo_out, false);
656 825df88c Csaba Kiraly
      fprintf(stderr,"Topo: ext adding %s (peers:%d)\n", node_addr(id), peerset_size(pset));
657 fcb5c29b Csaba Kiraly
      p = peerset_get_peer(pset,id);
658
    }
659
  }
660
661
  return p;
662
}
663
664 7e39164f MarcoBiazzini
int peers_init(void)
665 fcb5c29b Csaba Kiraly
{
666
  fprintf(stderr,"peers_init\n");
667
  pset = peerset_init(0);
668 0bb2425e Csaba Kiraly
  return pset ? 1 : 0;
669 fcb5c29b Csaba Kiraly
}
670
671 7e39164f MarcoBiazzini
struct peerset *get_peers(void)
672 fcb5c29b Csaba Kiraly
{
673
  return pset;
674
}