Statistics
| Branch: | Revision:

streamers / topology-ALTO.c @ 5484da90

History | View | Annotate | Download (11.7 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *  Copyright (c) 2010 Luca Abeni
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <stdint.h>
8
#include <stdio.h>
9
#include <sys/time.h>
10
#include <time.h>
11
#include <assert.h>
12
#include <string.h>
13
#include <stdlib.h> /* qsort, rand */
14

    
15
#include <net_helper.h>
16
#include <peerset.h>
17
#include <peer.h>
18
#include <grapes_msg_types.h>
19
#include <topmanager.h>
20
#include <ALTOclient.h>
21
#include <pthread.h>
22

    
23
#include "net_helpers.h"        /* default_ip_addr() */
24
#include "topology.h"
25
#include "streaming.h"
26
#include "dbg.h"
27
#include "measures.h"
28
#include "config.h"
29

    
30

    
31
static int NEIGHBORHOOD_TARGET_SIZE;
32

    
33
static struct peerset *pset;
34
static struct timeval tout_bmap = {10, 0};
35

    
36
#define LOG_EVERY        1000
37
static int cnt = 0;
38

    
39
static struct nodeID *me;
40
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY};
41
static uint64_t currtime;
42
static struct nodeID **altoList;
43
static int altoList_size, c_neigh_size, newAltoResults=2;
44
static struct nodeID **currentNeighborhood;
45

    
46
/* ALTO begin --> */
47
#define ALTO_MAX_PEERS        1024
48
static int ALTO_bucket_size, RAND_bucket_size;
49

    
50
/* work struct */
51
static struct tagALTOInfo {
52
  struct in_addr localIPAddr;
53
  ALTO_GUIDANCE_T peers[ALTO_MAX_PEERS];
54
} ALTOInfo;
55

    
56
/* input to the peer selection function */
57
typedef struct {
58
  const struct nodeID **neighbours;
59
  int numNeighb;
60
  unsigned int pri_crit;        /* primary rating criterion */
61
  unsigned int sec_crit;        /* secondary rating criterion flags */
62
  float altoFactor;                        /* percentage of ALTO selected peers in disjoint bucket */
63
  /* jahanpanah@neclab.eu */
64
} ALTOInput_t;
65
/* <-- ALTO end */
66

    
67
void add_peer(struct nodeID *id)
68
{
69
      dprintf("Adding %s to neighbourhood!\n", node_addr(id));
70
      peerset_add_peer(pset, id);
71
      /* add measures here */
72
      add_measures(id);
73
      send_bmap(peerset_get_peer(pset,id));
74
}
75

    
76
void remove_peer(struct nodeID *id)
77
{
78
      dprintf("Removing %s from neighbourhood!\n", node_addr(id));
79
      /* add measures here */
80
      delete_measures(id);
81
      peerset_remove_peer(pset, id);
82
}
83

    
84
/* ALTO begin --> */
85

    
86
/**
87
 * As nodeID is an opaque type not exposing its in_addr, we need to convert the
88
 * IP string returned by node_addr() via inet_aton().
89
 */
90
static void node_convert_addr(const struct nodeID* node, struct in_addr* addr) {
91
  int len, rescode;
92
  char peerIP[64];
93
  const char* peerAddr = node_addr(node);
94

    
95
  /* extract IP string without port */
96
  const char* tmp = strstr(peerAddr, ":");
97
  if (tmp) len = tmp-peerAddr;        /* calc len of IP string */
98
  else len = strlen(peerAddr)+1;
99
  memcpy(peerIP, peerAddr, len);
100
  peerIP[len] = 0;        /* trailing zero */
101

    
102
  /*dprintf("%d. peer addr = %s IP = %s\n", p, peerAddr, peerIP);*/
103

    
104
  /* fill in IP addr */
105
  rescode = inet_aton(peerIP, addr);
106
  assert(rescode != 0);
107
}
108

    
109
/*
110
  for a given address find corresponding entry in unsorted
111
  neighbour list provided by streamer
112
*/
113
static const struct nodeID* findALTOPeerInNeighbourList(const struct nodeID **neighbours, int numNeighb, size_t altoEntry)
114
{
115
  int p; struct nodeID *res;
116
  struct in_addr addr;
117

    
118
  for (p=0; p < numNeighb; p++) {
119
    if (!neighbours[p]) continue;
120
    node_convert_addr(neighbours[p], &addr);
121
    if (addr.s_addr == ALTOInfo.peers[altoEntry].alto_host.s_addr) {
122
      /* we found the entry */
123
                res = neighbours[p];
124
                neighbours[p] = NULL;
125
      return res;
126
        }
127
  }
128
  return NULL; /* not found */
129
}
130

    
131
/** qsort comparator function for ALTO peer rankings */
132
static int qsort_compare_ALTO_ranking(const void* a, const void* b)
133
{
134
  const ALTO_GUIDANCE_T* r1 = (const ALTO_GUIDANCE_T*) a;
135
  const ALTO_GUIDANCE_T* r2 = (const ALTO_GUIDANCE_T*) b;
136
  return r1->rating - r2->rating;
137
}
138

    
139
static void topo_ALTO_init(void)
140
{
141
        char* myIP;
142

    
143
        NEIGHBORHOOD_TARGET_SIZE = g_config.neighborhood_target_size;
144

    
145
        myIP = default_ip_addr();
146
        dprintf("myIP: %s\n", myIP);
147
        assert(inet_aton(myIP, &ALTOInfo.localIPAddr) != 0);
148
        start_ALTO_client();
149
        /*set_ALTO_server("http://10.10.251.107/cgi-bin/alto-server.cgi");
150
          set_ALTO_server("http://www.napa-wine-alto.eu/cgi-bin/alto-server.cgi");*/
151
        fprintf(stderr,"Setting ALTO server URL: '%s'\n", g_config.alto_server);
152
        set_ALTO_server(g_config.alto_server);
153

    
154
        srand(time(NULL));
155
}
156

    
157
void topologyShutdown(void)
158
{
159
  stop_ALTO_client();
160
  config_free();
161
}
162

    
163
/*
164
  The main peer selector function.
165
*/
166
void PeerSelectorALTO(void)
167
{
168
        int p, i, j, rescode;
169
        struct timeval tnow;
170
        uint64_t timenow;
171

    
172
        gettimeofday(&tnow, NULL);
173
        timenow = (tnow.tv_usec + tnow.tv_sec * 1000000ull);
174
        if ((timenow - currtime > (g_config.update_interval * 1000000)) &&
175
                        (newAltoResults)) {
176

    
177
                /* fill the temporary peer list (used for ALTO query) */
178
                for (p=0; p < c_neigh_size; p++) {
179
                        /* copy/convert nodeID IP address into ALTO struct */
180
                        node_convert_addr(currentNeighborhood[p], &ALTOInfo.peers[p].alto_host);
181

    
182
                        ALTOInfo.peers[p].prefix = 32;
183
                        ALTOInfo.peers[p].rating = -1; /* init to dummy rating */
184
                }
185

    
186
                /*****************************************************************/
187
                /*   ALTO query                                                  */
188
                /*****************************************************************/
189
                gettimeofday(&tnow, NULL);
190
                timenow = (tnow.tv_usec + tnow.tv_sec * 1000000ull);
191
                fprintf(stderr,"Calling ALTO server at : %u\n",(unsigned int)(((unsigned long)timenow)/1000000)); //
192
                rescode = ALTO_query_exec( // get_ALTO_guidance_for_list(
193
                                ALTOInfo.peers,
194
                                c_neigh_size,
195
                                ALTOInfo.localIPAddr,
196
                                g_config.alto_pri_criterion,
197
                                g_config.alto_sec_criterion
198
                );
199

    
200
                /*assert(rescode == 1);*/
201
                if (rescode != 1) {
202
                        fprintf(stderr,"WARNING: ALTO query FAILED!\n");
203
                        newAltoResults = 1;
204
                }
205
                else {
206
                        newAltoResults = 0;
207
                }
208
        }
209

    
210
        if (ALTO_query_state() == ALTO_QUERY_READY) {
211

    
212
                if (!newAltoResults) {
213
                        gettimeofday(&tnow, NULL);
214
                        currtime = (tnow.tv_usec + tnow.tv_sec * 1000000ull);
215
                        fprintf(stderr,"Received ALTO server reply at : %u\n",(unsigned int)(((unsigned long)currtime)/1000000));
216

    
217
                        /* Build sorted list of ALTO-rated peers */
218
                        qsort(ALTOInfo.peers, c_neigh_size, sizeof(ALTO_GUIDANCE_T), qsort_compare_ALTO_ranking);
219

    
220
#ifdef LOG_ALTO_RATINGS
221
                        dprintf("\nSorted ALTO ratings:\n");
222
                        for (p=0; p < c_neigh_size; p++) {
223
                                dprintf("ALTO peer %d: rating = %d\n ", p, ALTOInfo.peers[p].rating);
224
                        }
225
#endif
226

    
227
                        /*
228
    now build the disjoint buckets of peers:
229

230
    we pick a certain percentage of the best ALTO-rated peers
231
    and fill the rest with random neighbours
232
                         */
233

    
234
                        for (i=0; i < altoList_size; i++) {
235
                                nodeid_free(altoList[i]);
236
                        }
237

    
238
                        if ((NEIGHBORHOOD_TARGET_SIZE > 0) && (NEIGHBORHOOD_TARGET_SIZE < c_neigh_size)) {
239
                                altoList_size = NEIGHBORHOOD_TARGET_SIZE;
240
                        } else {
241
                                altoList_size = c_neigh_size;
242
                        }
243
                        ALTO_bucket_size = altoList_size * g_config.alto_factor;
244
                        RAND_bucket_size = altoList_size - ALTO_bucket_size;
245

    
246
                        altoList = realloc(altoList,altoList_size*sizeof(struct nodeID *));
247
                        altoList = memset(altoList,0,altoList_size*sizeof(struct nodeID *));
248

    
249
                        /* add ALTO peers */
250
                        fprintf(stderr,"\nSorted ALTO peers:\n");
251
                        for (i = 0; i < ALTO_bucket_size; i++) {
252
                                altoList[i] = findALTOPeerInNeighbourList(currentNeighborhood, c_neigh_size, i);
253

    
254
                                fprintf(stderr,"ALTO peer %d: id  = %s ; rating = %d\n ", (i+1), node_addr(altoList[i]), ALTOInfo.peers[i].rating);
255
                        }
256

    
257
                /* add remaining peers randomly */
258
                fprintf(stderr,"\nMore ALTO randomly picked peers:\n");
259
                for (j = ALTO_bucket_size; j < ALTO_bucket_size + RAND_bucket_size; j++) {
260
                        do { // FIXME: it works only if gossipNeighborhood is realloc'ed for sure between two queries...
261
                                p = rand() % c_neigh_size;
262
                        } while (!currentNeighborhood[p]);
263

    
264
                        altoList[j] = currentNeighborhood[p];
265
                        currentNeighborhood[p] = NULL;
266
                        fprintf(stderr,"ALTO peer %d: id  = %s\n ", (j+1), node_addr(altoList[j]));
267
                }
268
                newAltoResults = 1;
269
        }
270
        }
271

    
272
}
273

    
274
/* <-- ALTO end */
275

    
276
int topoAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
277
{
278
        return topAddNeighbour(neighbour,metadata,metadata_size);
279
}
280

    
281
int topologyInit(struct nodeID *myID, const char *config)
282
{
283
        me = myID;
284
        bind_msg_type(mTypes[0]);
285
        config_init();
286
        config_load("streamer.conf");
287
        //config_dump();
288
        topo_ALTO_init();
289
        return (topInit(myID, NULL, 0, config));
290
}
291

    
292

    
293
void update_peers(struct nodeID *from, const uint8_t *buff, int len)
294
{
295
        int i,j,k,npeers,doAdd,psize;
296
        struct peer *peers; static const struct nodeID **nbrs;
297
        struct timeval told,tnow;
298

    
299
        psize = peerset_size(pset);
300
        dprintf("before:%d, ",psize);
301
        topParseData(buff, len);
302
        nbrs = topGetNeighbourhood(&npeers);
303
        if (newAltoResults) {
304
                for (i=0; i < c_neigh_size; i++) {
305
                        nodeid_free(currentNeighborhood[i]);
306
                }
307
                free(currentNeighborhood);
308
                c_neigh_size = npeers + psize;
309
                currentNeighborhood = calloc(c_neigh_size,sizeof(struct nodeID *));
310
                for (i=0; i < psize; i++) {
311
                        currentNeighborhood[i] = nodeid_dup((peerset_get_peers(pset)[i]).id);
312
                }
313
                for (j=0; j < npeers; j++) {
314
                        if (peerset_check(pset, nbrs[j]) < 0) {
315
                                currentNeighborhood[i] = nodeid_dup(nbrs[j]);
316
                                i++;
317
                        }
318
                }
319
                c_neigh_size = i;
320
                currentNeighborhood = i?realloc(currentNeighborhood,c_neigh_size * sizeof(struct nodeID *)):NULL;
321
        }
322
        if (c_neigh_size > 1) {
323
                PeerSelectorALTO();
324
        }
325

    
326
        if (ALTO_query_state() == ALTO_QUERY_READY && newAltoResults==1) {
327
        //        fprintf(stderr,"Composing peerset from ALTO selection\n");
328
                for (i=0; i < altoList_size; i++) { /* first goal : add all new ALTO-ranked peers
329
                                                                                        second goal : do not increase current peerset size
330
                                                                                        third goal : make space by deleting only peers that
331
                                                                                                are not in the current ALTO-reply */
332
                        if (peerset_check(pset, altoList[i]) < 0) {
333
                                doAdd = 1;
334
                                if (NEIGHBORHOOD_TARGET_SIZE &&
335
                                        peerset_size(pset) >= NEIGHBORHOOD_TARGET_SIZE) {
336
                                        doAdd = 0;
337
                                        for (j = 0; j < peerset_size(pset); j++) {
338
                                                for (k = 0; k < altoList_size; k++) {
339
                                                        if (k!=i && nodeid_equal(peerset_get_peers(pset)[j].id, altoList[k])) {
340
                                                                break;
341
                                                        }
342
                                                }
343
                                                if (k == altoList_size) {
344
                                                        remove_peer(peerset_get_peers(pset)[j].id);
345
                                                        doAdd = 1;
346
                                                        break; // without this, peerset size would possibly decrease
347
                                                }
348
                                        }
349
                                }
350
                                if (doAdd) {
351
                                        add_peer(altoList[i]);
352
                                }
353
                        }
354
                }
355
                newAltoResults = 2;
356
        }
357
        if (peerset_size(pset) < NEIGHBORHOOD_TARGET_SIZE) { // ALTO selection didn't fill the peerset
358
                //        fprintf(stderr,"Composing peerset from ncast neighborhood\n");
359
                for (i=0;i<npeers;i++) {
360
                        if(peerset_check(pset, nbrs[i]) < 0) {
361
                                if (!NEIGHBORHOOD_TARGET_SIZE || peerset_size(pset) < NEIGHBORHOOD_TARGET_SIZE) {
362
                                        add_peer(nbrs[i]);
363
                                }
364
                        }
365
                }
366
        }
367
        psize = peerset_size(pset);
368
        dprintf("after:%d, ",psize);
369

    
370
        gettimeofday(&tnow, NULL);
371
        timersub(&tnow, &tout_bmap, &told);
372
        peers = peerset_get_peers(pset);
373

    
374
        if (LOG_EVERY && ++cnt % LOG_EVERY == 0) {
375
                fprintf(stderr,"\nMy peerset : size = %d\n",psize);
376
                for (i=0;i<psize;i++) {
377
                        fprintf(stderr,"\t%d : %s\n",i,node_addr(peers[i].id));
378
                }
379
        }
380

    
381
        for (i = 0; i < peerset_size(pset); i++) {
382
                if ( (!timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].creation_timestamp, &told, <) ) ||
383
                                ( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <)     )   ) {
384
                        //if (peerset_size(pset) > 1) {        // avoid dropping our last link to the world
385
                        topRemoveNeighbour(peers[i].id);
386
                        remove_peer(peers[i--].id);
387
                        //}
388
                }
389
        }
390

    
391
        reg_neigh_size(peerset_size(pset));
392

    
393
        dprintf("after timer check:%d\n",peerset_size(pset));
394
}
395

    
396
struct peer *nodeid_to_peer(const struct nodeID* id, int reg)
397
{
398
        struct peer *p = peerset_get_peer(pset, id);
399
        if (!p) {
400
                fprintf(stderr,"warning: received message from unknown peer: %s!\n",node_addr(id));
401
                if (reg && peerset_size(pset) < NEIGHBORHOOD_TARGET_SIZE) {
402
        //      topoAddNeighbour(id, NULL, 0);        //@TODO: this is agressive
403
                        add_peer(id);
404
                        p = peerset_get_peer(pset,id);
405
                }
406
        }
407

    
408
        return p;
409
}
410

    
411
int peers_init()
412
{
413
        fprintf(stderr,"peers_init\n");
414
        pset = peerset_init(0);
415
        return pset ? 1 : 0;
416
}
417

    
418
struct peerset *get_peers()
419
{
420
        return pset;
421
}