Statistics
| Branch: | Revision:

streamers / topology.c @ b4419dd1

History | View | Annotate | Download (10.3 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *  Copyright (c) 2010 Luca Abeni
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <stdint.h>
8
#include <stdio.h>
9
#include <sys/time.h>
10
#include <time.h>
11
#include <stdlib.h>
12
#include <string.h>
13

    
14
#include <math.h>
15
#include <net_helper.h>
16
#include <peerset.h>
17
#include <peer.h>
18
#include <grapes_msg_types.h>
19
#include <topmanager.h>
20
#include <tman.h>
21

    
22
#include "compatibility/timer.h"
23

    
24
#include "topology.h"
25
#include "nodeid_set.h"
26
#include "streaming.h"
27
#include "dbg.h"
28
#include "measures.h"
29
#include "streamer.h"
30

    
31
#define MIN(A,B) (((A) < (B)) ? (A) : (B))
32
#define MAX(A,B) (((A) > (B)) ? (A) : (B))
33

    
34
double desired_bw = 0;
35
double desired_rtt = 0.2;
36
double alpha_target = 0.5;
37

    
38
int NEIGHBORHOOD_TARGET_SIZE = 30;
39
double NEIGHBORHOOD_ROTATE_RATIO = 1.0;
40
#define TMAN_MAX_IDLE 10
41
#define TMAN_LOG_EVERY 1000
42

    
43
static struct peerset *pset;
44
static struct timeval tout_bmap = {20, 0};
45
static int counter = 0;
46
static int simpleRanker (const void *tin, const void *p1in, const void *p2in);
47
static tmanRankingFunction rankFunct = simpleRanker;
48

    
49
struct metadata {
50
  uint16_t cb_size;
51
  uint16_t cps;
52
  float capacity;
53
  float recv_delay;
54
} __attribute__((packed));
55

    
56
static struct metadata my_metadata;
57
static int cnt = 0;
58
static struct nodeID *me = NULL;
59
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY,MSG_TYPE_TMAN};
60
static struct nodeID ** neighbors;
61

    
62
static void update_metadata(void) {
63
        my_metadata.cb_size = am_i_source() ? 0 : get_cb_size();
64
        my_metadata.recv_delay = get_receive_delay();
65
        my_metadata.cps = get_chunks_per_sec();
66
        my_metadata.capacity = get_capacity();
67
}
68

    
69
static int simpleRanker (const void *tin, const void *p1in, const void *p2in) {
70

    
71
        double t,p1,p2;
72
        t = *((const double *)tin);
73
        p1 = *((const double *)p1in);
74
        p2 = *((const double *)p2in);
75

    
76
        if (isnan(t) || (isnan(p1) && isnan(p2))) return 0;
77
        else if (isnan(p1)) return 2;
78
        else if (isnan(p2)) return 1;
79
        else return (fabs(t-p1) == fabs(t-p2))?0:(fabs(t-p1) < fabs(t-p2))?1:2;
80

    
81
}
82

    
83
int topologyInit(struct nodeID *myID, const char *config)
84
{
85
        int i;
86
        for (i=0;i<2;i++)
87
                bind_msg_type(mTypes[i]);
88
        update_metadata();
89
        me = myID;
90
        return (topInit(myID, &my_metadata, sizeof(my_metadata), config) && tmanInit(myID,&my_metadata, sizeof(my_metadata),rankFunct,0));
91
}
92

    
93
void topologyShutdown(void)
94
{
95
}
96

    
97
int topoAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
98
{
99
        // TODO: check this!! Just to use this function to bootstrap ncast...
100
        struct metadata m = {0};        //TODO: check what metadata option should mean
101

    
102
        if (counter < TMAN_MAX_IDLE)
103
                return topAddNeighbour(neighbour,&m,sizeof(m));
104
        else return tmanAddNeighbour(neighbour,&m,sizeof(m));
105
}
106

    
107
static int topoParseData(const uint8_t *buff, int len)
108
{
109
        int res = -1,ncs = 0,msize;
110
        const struct nodeID **n; const void *m;
111
        if (!buff || buff[0] == MSG_TYPE_TOPOLOGY) {
112
                res = topParseData(buff,len);
113
//                if (counter <= TMAN_MAX_IDLE)
114
//                        counter++;
115
        }
116
        if (counter >= TMAN_MAX_IDLE && (!buff || buff[0] == MSG_TYPE_TMAN))
117
        {
118
                n = topGetNeighbourhood(&ncs);
119
                if (ncs) {
120
                m = topGetMetadata(&msize);
121
                res = tmanParseData(buff,len,n,ncs,m,msize);
122
                }
123
        }
124
  return res;
125
}
126

    
127
static const struct nodeID **topoGetNeighbourhood(int *n)
128
{
129
        int i; double d;
130
        if (counter > TMAN_MAX_IDLE) {
131
                uint8_t *mdata; int msize;
132
                *n = tmanGetNeighbourhoodSize();
133
                if (neighbors) free(neighbors);
134
                neighbors = calloc(*n,sizeof(struct nodeID *));
135
                tmanGetMetadata(&msize);
136
                mdata = calloc(*n,msize);
137
                tmanGivePeers(*n,neighbors,(void *)mdata);
138

    
139
                if (cnt % TMAN_LOG_EVERY == 0) {
140
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_chunk_delay,%f\n",node_addr(me),node_addr(me),my_metadata.recv_delay);
141
                        for (i=0;i<(*n) && i<NEIGHBORHOOD_TARGET_SIZE;i++) {
142
                                d = *((double *)(mdata+i*msize));
143
                                fprintf(stderr,"abouttopublish,%s,",node_addr(me));
144
                                fprintf(stderr,"%s,,Tman_chunk_delay,%f\n",node_addr(neighbors[i]),d);
145
                        }
146
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_neighborhood_size,%d\n\n",node_addr(me),node_addr(me),*n);
147
                }
148

    
149
                free(mdata);
150
                return (const struct nodeID **)neighbors;
151
        }
152
        else
153
                return topGetNeighbourhood(n);
154
}
155

    
156
static void topoAddToBL (struct nodeID *id)
157
{
158
        if (counter >= TMAN_MAX_IDLE)
159
                tmanAddToBlackList(id);
160
//        else
161
                topAddToBlackList(id);
162
}
163

    
164
void add_peer(const struct nodeID *id, const struct metadata *m)
165
{
166
      dprintf("Adding %s to neighbourhood! cb_size:%d\n", node_addr(id), m?m->cb_size:-1);
167
      peerset_add_peer(pset, id);
168
      if (m) peerset_get_peer(pset, id)->cb_size = m->cb_size;
169
      if (m) peerset_get_peer(pset, id)->capacity = m->capacity;
170
      /* add measures here */
171
      add_measures(id);
172
      send_bmap(id);
173
}
174

    
175
void remove_peer(const struct nodeID *id)
176
{
177
      dprintf("Removing %s from neighbourhood!\n", node_addr(id));
178
      /* add measures here */
179
      delete_measures(id);
180
      peerset_remove_peer(pset, id);
181
}
182

    
183
//get the rtt. Currenly only MONL version is supported
184
static double get_rtt_of(const struct nodeID* n){
185
#ifdef MONL
186
  return get_rtt(n);
187
#else
188
  return NAN;
189
#endif
190
}
191

    
192
//get the declared capacity of a node
193
static double get_capacity_of(const struct nodeID* n){
194
  struct peer *p = peerset_get_peer(pset, n);
195
  if (p) {
196
    return p->capacity;
197
  }
198

    
199
  return NAN;
200
}
201

    
202
//returns: 1:yes 0:no -1:unknown
203
int desiredness(const struct nodeID* n) {
204
  double rtt = get_rtt_of(n);
205
  double bw =  get_capacity_of(n);
206

    
207
  if ((isnan(rtt) && finite(desired_rtt)) || (isnan(bw) && desired_bw > 0)) {
208
    return -1;
209
  } else if ((isnan(rtt) || rtt <= desired_rtt) && (isnan(bw) || bw >= desired_bw)) {
210
    return 1;
211
  }
212

    
213
  return 0;
214
}
215

    
216
bool is_desired(const struct nodeID* n) {
217
  return (desiredness(n) == 1);
218
}
219

    
220
// currently it just makes the peerset grow
221
void update_peers(struct nodeID *from, const uint8_t *buff, int len)
222
{
223
  int n_ids, metasize, i, newids_size, max_ids;
224
  static const struct nodeID **newids;
225
  static const struct metadata *metas;
226
  struct peer *peers;
227
  struct timeval tnow, told;
228

    
229
  if timerisset(&tout_bmap) {
230
    gettimeofday(&tnow, NULL);
231
    timersub(&tnow, &tout_bmap, &told);
232
    peers = peerset_get_peers(pset);
233
    for (i = 0; i < peerset_size(pset); i++) {
234
      if ( (!timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].creation_timestamp, &told, <) ) ||
235
           ( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <)     )   ) {
236
        ftprintf(stderr,"Topo: dropping inactive %s (peers:%d)\n", node_addr(peers[i].id), peerset_size(pset));
237
        //if (peerset_size(pset) > 1) {        // avoid dropping our last link to the world
238
        topoAddToBL(peers[i].id);
239
        remove_peer(peers[i--].id);
240
        //}
241
      }
242
    }
243
  }
244

    
245
  if (cnt++ % 100 == 0) {
246
        update_metadata();
247
    if (counter > TMAN_MAX_IDLE) {
248
        tmanChangeMetadata(&my_metadata,sizeof(my_metadata));
249
    }
250
  }
251

    
252
  topoParseData(buff, len);
253

    
254
  if (!buff) {
255
    reg_neigh_size(peerset_size(pset));
256
    return;
257
  }
258

    
259
  peers = peerset_get_peers(pset);
260
  n_ids = peerset_size(pset);
261
  newids = topoGetNeighbourhood(&newids_size);        //TODO handle both tman and topo
262
  metas = topGetMetadata(&metasize);        //TODO: check metasize
263
  max_ids = n_ids + newids_size;
264
  ftprintf(stderr,"Topo modify start peers:%d candidates:%d\n", n_ids, newids_size);
265
  {
266
    int desired_part;
267
    const struct nodeID *oldids[max_ids], *nodeids[max_ids], *desireds[max_ids], *selecteds[max_ids], *others[max_ids], *toadds[max_ids], *toremoves[max_ids];
268
    size_t oldids_size, nodeids_size, desireds_size, selecteds_size, others_size, toadds_size, toremoves_size;
269
    nodeids_size = desireds_size = selecteds_size = others_size = toadds_size = toremoves_size = max_ids;
270

    
271
    for (i = 0, oldids_size = 0; i < peerset_size(pset); i++) {
272
      oldids[oldids_size++] = peers[i].id;
273
      fprintf(stderr," %s - RTT: %f\n", node_addr(peers[i].id) , get_rtt_of(peers[i].id));
274
    }
275

    
276

    
277
    //compose list of nodeids
278
    nidset_add(nodeids, &nodeids_size, oldids, oldids_size, newids, newids_size);
279

    
280
    // select the alpha_target portion of desired peers
281
    desired_part = (1 - alpha_target) * NEIGHBORHOOD_TARGET_SIZE;
282
    nidset_filter(desireds, &desireds_size, nodeids, nodeids_size, is_desired);
283
    nidset_shuffle(desireds, desireds_size);
284
    selecteds_size = MIN(desireds_size,desired_part);
285
    memcpy(selecteds, desireds, selecteds_size * sizeof(selecteds[0]));
286

    
287
    // random from the rest
288
    nidset_complement(others, &others_size, nodeids, nodeids_size, selecteds, selecteds_size);
289
    nidset_shuffle(others, others_size);
290
    nidset_add_i(selecteds, &selecteds_size, max_ids, others, NEIGHBORHOOD_TARGET_SIZE ? MIN(others_size, NEIGHBORHOOD_TARGET_SIZE - selecteds_size) : others_size);
291

    
292
    fprintf(stderr,"Topo modify (from:%ld sel:%ld) - desired: %ld of %ld (target:%d sel:%ld); random: from %ld (sel:%ld)\n",
293
            (long)nodeids_size, (long)selecteds_size, (long)desireds_size, (long)nodeids_size, desired_part, (long) MIN(desireds_size,desired_part), (long)others_size, (long)selecteds_size - MIN(desireds_size, desired_part));
294
    // add new ones
295
    nidset_complement(toadds, &toadds_size, selecteds, selecteds_size, oldids, oldids_size);
296
    for (i = 0; i < toadds_size; i++) {
297
      size_t j;
298
      //searching for the metadata
299
      if (nidset_find(&j, newids, newids_size, toadds[i])) {
300
        fprintf(stderr," adding %s\n", node_addr(toadds[i]));
301
        add_peer(newids[j], &metas[j]);
302
      } else {
303
        fprintf(stderr," Error: missing metadata for %s\n", node_addr(toadds[i]));
304
      }
305
    }
306

    
307
    // finally, remove those not needed
308
    fprintf(stderr,"Topo remove start (peers:%d)\n", n_ids);
309
    nidset_complement(toremoves, &toremoves_size, oldids, oldids_size, selecteds, selecteds_size);
310
    for (i = 0; i < toremoves_size; i++) {
311
      fprintf(stderr," removing %s\n", node_addr(toremoves[i]));
312
      remove_peer(toremoves[i]);
313
    }
314
    fprintf(stderr,"Topo remove end\n");
315
  }
316

    
317
  reg_neigh_size(peerset_size(pset));
318
}
319

    
320
struct peer *nodeid_to_peer(const struct nodeID* id, int reg)
321
{
322
  struct peer *p = peerset_get_peer(pset, id);
323
  if (!p) {
324
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
325
    if (reg) {
326
      add_peer(id,NULL);
327
      fprintf(stderr,"Topo: ext adding %s (peers:%d)\n", node_addr(id), peerset_size(pset));
328
      p = peerset_get_peer(pset,id);
329
    }
330
  }
331

    
332
  return p;
333
}
334

    
335
int peers_init(void)
336
{
337
  fprintf(stderr,"peers_init\n");
338
  pset = peerset_init(0);
339
  return pset ? 1 : 0;
340
}
341

    
342
struct peerset *get_peers(void)
343
{
344
  return pset;
345
}