Statistics
| Branch: | Revision:

streamers / topology.c @ 9d55b3b7

History | View | Annotate | Download (8.85 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *  Copyright (c) 2010 Luca Abeni
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <stdint.h>
8
#include <stdio.h>
9
#include <sys/time.h>
10
#include <time.h>
11
#include <stdlib.h>
12

    
13
#include <math.h>
14
#include <net_helper.h>
15
#include <peerset.h>
16
#include <peer.h>
17
#include <grapes_msg_types.h>
18
#include <topmanager.h>
19
#include <tman.h>
20

    
21
#include "compatibility/timer.h"
22

    
23
#include "topology.h"
24
#include "streaming.h"
25
#include "dbg.h"
26
#include "measures.h"
27
#include "streamer.h"
28

    
29
double desired_rtt = 0.2;
30
double alpha_target = 0.5;
31

    
32
int NEIGHBORHOOD_TARGET_SIZE = 20;
33
double NEIGHBORHOOD_ROTATE_RATIO = 1.0;
34
#define TMAN_MAX_IDLE 10
35
#define TMAN_LOG_EVERY 1000
36

    
37
static struct peerset *pset;
38
static struct timeval tout_bmap = {10, 0};
39
static int counter = 0;
40
static int simpleRanker (const void *tin, const void *p1in, const void *p2in);
41
static tmanRankingFunction rankFunct = simpleRanker;
42
struct metadata {
43
  uint16_t cb_size;
44
  uint16_t cps;
45
  float recv_delay;
46
};
47
static struct metadata my_metadata;
48
static int cnt = 0;
49
static struct nodeID *me = NULL;
50
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY,MSG_TYPE_TMAN};
51
static struct nodeID ** neighbors;
52

    
53
static void update_metadata(void) {
54

    
55
        my_metadata.cb_size = am_i_source() ? 0 : get_cb_size();
56
        my_metadata.recv_delay = get_receive_delay();
57
        my_metadata.cps = get_chunks_per_sec();
58
}
59

    
60
static int simpleRanker (const void *tin, const void *p1in, const void *p2in) {
61

    
62
        double t,p1,p2;
63
        t = *((const double *)tin);
64
        p1 = *((const double *)p1in);
65
        p2 = *((const double *)p2in);
66

    
67
        if (isnan(t) || (isnan(p1) && isnan(p2))) return 0;
68
        else if (isnan(p1)) return 2;
69
        else if (isnan(p2)) return 1;
70
        else return (fabs(t-p1) == fabs(t-p2))?0:(fabs(t-p1) < fabs(t-p2))?1:2;
71

    
72
}
73

    
74
int topologyInit(struct nodeID *myID, const char *config)
75
{
76
        int i;
77
        for (i=0;i<2;i++)
78
                bind_msg_type(mTypes[i]);
79
        update_metadata();
80
        me = myID;
81
        return (topInit(myID, &my_metadata, sizeof(my_metadata), config) && tmanInit(myID,&my_metadata, sizeof(my_metadata),rankFunct,0));
82
}
83

    
84
void topologyShutdown(void)
85
{
86
}
87

    
88
int topoAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
89
{
90
        // TODO: check this!! Just to use this function to bootstrap ncast...
91
        struct metadata m = {0};        //TODO: check what metadata option should mean
92

    
93
        if (counter < TMAN_MAX_IDLE)
94
                return topAddNeighbour(neighbour,&m,sizeof(m));
95
        else return tmanAddNeighbour(neighbour,&m,sizeof(m));
96
}
97

    
98
static int topoParseData(const uint8_t *buff, int len)
99
{
100
        int res = -1,ncs = 0,msize;
101
        const struct nodeID **n; const void *m;
102
        if (!buff || buff[0] == MSG_TYPE_TOPOLOGY) {
103
                res = topParseData(buff,len);
104
//                if (counter <= TMAN_MAX_IDLE)
105
//                        counter++;
106
        }
107
        if (counter >= TMAN_MAX_IDLE && (!buff || buff[0] == MSG_TYPE_TMAN))
108
        {
109
                n = topGetNeighbourhood(&ncs);
110
                if (ncs) {
111
                m = topGetMetadata(&msize);
112
                res = tmanParseData(buff,len,n,ncs,m,msize);
113
                }
114
        }
115
  return res;
116
}
117

    
118
static const struct nodeID **topoGetNeighbourhood(int *n)
119
{
120
        int i; double d;
121
        if (counter > TMAN_MAX_IDLE) {
122
                uint8_t *mdata; int msize;
123
                *n = tmanGetNeighbourhoodSize();
124
                if (neighbors) free(neighbors);
125
                neighbors = calloc(*n,sizeof(struct nodeID *));
126
                tmanGetMetadata(&msize);
127
                mdata = calloc(*n,msize);
128
                tmanGivePeers(*n,neighbors,(void *)mdata);
129

    
130
                if (cnt % TMAN_LOG_EVERY == 0) {
131
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_chunk_delay,%f\n",node_addr(me),node_addr(me),my_metadata.recv_delay);
132
                        for (i=0;i<(*n) && i<NEIGHBORHOOD_TARGET_SIZE;i++) {
133
                                d = *((double *)(mdata+i*msize));
134
                                fprintf(stderr,"abouttopublish,%s,",node_addr(me));
135
                                fprintf(stderr,"%s,,Tman_chunk_delay,%f\n",node_addr(neighbors[i]),d);
136
                        }
137
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_neighborhood_size,%d\n\n",node_addr(me),node_addr(me),*n);
138
                }
139

    
140
                free(mdata);
141
                return (const struct nodeID **)neighbors;
142
        }
143
        else
144
                return topGetNeighbourhood(n);
145
}
146

    
147
static void topoAddToBL (struct nodeID *id)
148
{
149
        if (counter >= TMAN_MAX_IDLE)
150
                tmanAddToBlackList(id);
151
//        else
152
                topAddToBlackList(id);
153
}
154

    
155
void add_peer(const struct nodeID *id, const struct metadata *m)
156
{
157
      dprintf("Adding %s to neighbourhood! cb_size:%d\n", node_addr(id), m?m->cb_size:-1);
158
      peerset_add_peer(pset, id);
159
      if (m) peerset_get_peer(pset, id)->cb_size = m->cb_size;
160
      /* add measures here */
161
      add_measures(id);
162
      send_bmap(id);
163
}
164

    
165
void remove_peer(struct nodeID *id)
166
{
167
      dprintf("Removing %s from neighbourhood!\n", node_addr(id));
168
      /* add measures here */
169
      delete_measures(id);
170
      peerset_remove_peer(pset, id);
171
}
172

    
173
//get the rtt. Currenly only MONL version is supported
174
static double get_rtt_of(struct nodeID* n){
175
#ifdef MONL
176
  double rtt = get_rtt(n);
177
#else
178
  return NAN;
179
#endif
180
}
181

    
182
//returns: 1:yes 0:no -1:unknown
183
int is_desired(struct nodeID* n) {
184
  double rtt = get_rtt_of(n);
185

    
186
  return isnan(rtt) ? -1 : ((rtt <= desired_rtt) ? 1 : 0);
187
}
188

    
189
// currently it just makes the peerset grow
190
void update_peers(struct nodeID *from, const uint8_t *buff, int len)
191
{
192
  int n_ids, metasize, i;
193
  static const struct nodeID **ids;
194
  static const struct metadata *metas;
195
  struct peer *peers;
196
  struct timeval tnow, told;
197

    
198
//  dprintf("Update peers: topo_msg:%d, ",len);
199
//  if (from) {
200
//    dprintf("from:%s, ",node_addr(from));
201
//    if (peerset_check(pset, from) < 0) {
202
//      topAddNeighbour(from, NULL, 0);        //@TODO: this is agressive
203
//      if (!NEIGHBORHOOD_TARGET_SIZE || peerset_size(pset) < NEIGHBORHOOD_TARGET_SIZE) {
204
//        add_peer(from);
205
//      }
206
//    }
207
//  }
208

    
209
  if (cnt++ % 100 == 0) {
210
        update_metadata();
211
    if (counter > TMAN_MAX_IDLE) {
212
        tmanChangeMetadata(&my_metadata,sizeof(my_metadata));
213
    }
214
  }
215

    
216
  topoParseData(buff, len);
217

    
218
  if (!buff) {
219
    reg_neigh_size(peerset_size(pset));
220
    return;
221
  }
222

    
223
  fprintf(stderr,"Topo modify start\n");
224
  peers = peerset_get_peers(pset);
225
  for (i = 0; i < peerset_size(pset); i++) {
226
    fprintf(stderr," %s - RTT: %f\n", node_addr(peers[i].id) , get_rtt_of(peers[i].id));
227
  }
228

    
229
  ids = topoGetNeighbourhood(&n_ids);        //TODO handle both tman and topo
230
  metas = topGetMetadata(&metasize);        //TODO: check metasize
231
  for(i = 0; i < n_ids; i++) {
232
    if(peerset_check(pset, ids[i]) < 0) {
233
      if (!NEIGHBORHOOD_TARGET_SIZE || peerset_size(pset) < NEIGHBORHOOD_TARGET_SIZE) {
234
        add_peer(ids[i],&metas[i]);
235
      } else {  //rotate neighbourhood
236
        if (rand()/((double)RAND_MAX + 1) < NEIGHBORHOOD_ROTATE_RATIO) {
237
          add_peer(ids[i],&metas[i]);
238
        }
239
      }
240
    }
241
  }
242

    
243
  if timerisset(&tout_bmap) {
244
    gettimeofday(&tnow, NULL);
245
    timersub(&tnow, &tout_bmap, &told);
246
    peers = peerset_get_peers(pset);
247
    for (i = 0; i < peerset_size(pset); i++) {
248
      if ( (!timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].creation_timestamp, &told, <) ) ||
249
           ( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <)     )   ) {
250
        //if (peerset_size(pset) > 1) {        // avoid dropping our last link to the world
251
        topoAddToBL(peers[i].id);
252
        remove_peer(peers[i--].id);
253
        //}
254
      }
255
    }
256
  }
257

    
258

    
259
  n_ids = peerset_size(pset);
260
  fprintf(stderr,"Topo remove start (peers:%d)\n", n_ids);
261
  while (NEIGHBORHOOD_TARGET_SIZE && peerset_size(pset) > NEIGHBORHOOD_TARGET_SIZE) { //reduce back neighbourhood to target size
262
    int n, desired, desired_not, desired_unknown;
263
    struct peer *ds[n_ids], *dns[n_ids], *dus[n_ids];
264
    double alpha;
265

    
266
    peers = peerset_get_peers(pset);
267
    n = peerset_size(pset);
268
    desired = desired_not = desired_unknown = 0;
269
    for (i = 0; i < n; i++) {
270
      switch (is_desired(peers[i].id)) {
271
        case 1:
272
          ds[desired++] = &peers[i];
273
          break;
274
        case 0:
275
          dns[desired_not++] = &peers[i];
276
          break;
277
        case -1:
278
          dus[desired_unknown++] = &peers[i];
279
          break;
280
        default:
281
          fprintf(stderr,"error with desiredness!\n");
282
          exit(1);
283
      }
284
    }
285
    alpha = (double) desired / n;
286
    fprintf(stderr," peers:%d desired:%d unknown:%d notdesired:%d alpha: %f (target:%f)\n",n, desired, desired_unknown, desired_not, alpha, alpha_target);
287

    
288
    if (alpha > alpha_target && desired > 0) {
289
      remove_peer(ds[rand() % desired]->id);
290
    } else if (alpha < alpha_target && desired_not > 0) {
291
      remove_peer(dns[rand() % desired_not]->id);
292
    } else if (desired_unknown > 0) {
293
      remove_peer(dus[rand() % desired_unknown]->id);
294
    } else {
295
      remove_peer(peers[rand() % n].id);
296
    }
297
  }
298
  fprintf(stderr,"Topo remove end\n");
299

    
300
  reg_neigh_size(peerset_size(pset));
301
}
302

    
303
struct peer *nodeid_to_peer(const struct nodeID* id, int reg)
304
{
305
  struct peer *p = peerset_get_peer(pset, id);
306
  if (!p) {
307
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
308
    if (reg) {
309
      add_peer(id,NULL);
310
      p = peerset_get_peer(pset,id);
311
    }
312
  }
313

    
314
  return p;
315
}
316

    
317
int peers_init(void)
318
{
319
  fprintf(stderr,"peers_init\n");
320
  pset = peerset_init(0);
321
  return pset ? 1 : 0;
322
}
323

    
324
struct peerset *get_peers(void)
325
{
326
  return pset;
327
}