Statistics
| Branch: | Revision:

streamers / topology-ALTO.c @ da25233b

History | View | Annotate | Download (11.5 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *  Copyright (c) 2010 Luca Abeni
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <stdint.h>
8
#include <stdio.h>
9
#include <sys/time.h>
10
#include <time.h>
11
#include <assert.h>
12
#include <string.h>
13
#include <stdlib.h> /* qsort, rand */
14

    
15
#include <net_helper.h>
16
#include <peerset.h>
17
#include <peer.h>
18
#include <grapes_msg_types.h>
19
#include <topmanager.h>
20
#include <ALTOclient.h>
21
#include <pthread.h>
22

    
23
#include "net_helpers.h"        /* default_ip_addr() */
24
#include "topology.h"
25
#include "streaming.h"
26
#include "dbg.h"
27
#include "measures.h"
28
#include "config.h"
29

    
30

    
31
static int NEIGHBORHOOD_TARGET_SIZE;
32

    
33
static struct peerset *pset;
34
static struct timeval tout_bmap = {10, 0};
35

    
36
#define LOG_EVERY        1000
37
static int cnt = 0;
38

    
39
static struct nodeID *me;
40
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY};
41
static uint64_t currtime;
42
static struct nodeID **altoList;
43
static int altoList_size, c_neigh_size, newAltoResults=2;
44
static struct nodeID **currentNeighborhood;
45

    
46
/* ALTO begin --> */
47
#define ALTO_MAX_PEERS        1024
48
static int ALTO_bucket_size, RAND_bucket_size;
49

    
50
/* work struct */
51
static struct tagALTOInfo {
52
  struct in_addr localIPAddr;
53
  ALTO_GUIDANCE_T peers[ALTO_MAX_PEERS];
54
} ALTOInfo;
55

    
56
/* input to the peer selection function */
57
typedef struct {
58
  const struct nodeID **neighbours;
59
  int numNeighb;
60
  unsigned int pri_crit;        /* primary rating criterion */
61
  unsigned int sec_crit;        /* secondary rating criterion flags */
62
  float altoFactor;                        /* percentage of ALTO selected peers in disjoint bucket */
63
  /* jahanpanah@neclab.eu */
64
} ALTOInput_t;
65
/* <-- ALTO end */
66

    
67
void add_peer(struct nodeID *id)
68
{
69
      dprintf("Adding %s to neighbourhood!\n", node_addr(id));
70
      peerset_add_peer(pset, id);
71
      /* add measures here */
72
      add_measures(id);
73
      send_bmap(peerset_get_peer(pset,id));
74
}
75

    
76
void remove_peer(struct nodeID *id)
77
{
78
      dprintf("Removing %s from neighbourhood!\n", node_addr(id));
79
      /* add measures here */
80
      delete_measures(id);
81
      peerset_remove_peer(pset, id);
82
}
83

    
84
/* ALTO begin --> */
85

    
86
/**
87
 * As nodeID is an opaque type not exposing its in_addr, we need to convert the
88
 * IP string returned by node_addr() via inet_aton().
89
 */
90
static void node_convert_addr(const struct nodeID* node, struct in_addr* addr) {
91
  int rescode;
92

    
93
//  const char* peerAddr = node_addr(node);
94

    
95
  /* extract IP string without port */
96
  const char *peerIP = node_ip(node);
97

    
98
//  fprintf(stderr,"peer addr = %s IP = %s\n", peerAddr, peerIP);
99

    
100
  /* fill in IP addr */
101
  rescode = inet_aton(peerIP, addr);
102
  assert(rescode != 0);
103
}
104

    
105
/*
106
  for a given address find corresponding entry in unsorted
107
  neighbour list provided by streamer
108
*/
109
static const struct nodeID* findALTOPeerInNeighbourList(const struct nodeID **neighbours, int numNeighb, size_t altoEntry)
110
{
111
  int p; struct nodeID *res;
112
  struct in_addr addr;
113

    
114
  for (p=0; p < numNeighb; p++) {
115
    if (!neighbours[p]) continue;
116
    node_convert_addr(neighbours[p], &addr);
117
    if (addr.s_addr == ALTOInfo.peers[altoEntry].alto_host.s_addr) {
118
      /* we found the entry */
119
                res = neighbours[p];
120
                neighbours[p] = NULL;
121
      return res;
122
        }
123
  }
124
  return NULL; /* not found */
125
}
126

    
127
/** qsort comparator function for ALTO peer rankings */
128
static int qsort_compare_ALTO_ranking(const void* a, const void* b)
129
{
130
  const ALTO_GUIDANCE_T* r1 = (const ALTO_GUIDANCE_T*) a;
131
  const ALTO_GUIDANCE_T* r2 = (const ALTO_GUIDANCE_T*) b;
132
  return r1->rating - r2->rating;
133
}
134

    
135
static void topo_ALTO_init(char* myIP)
136
{
137
        NEIGHBORHOOD_TARGET_SIZE = g_config.neighborhood_target_size;
138
        dprintf("myIP: %s\n", myIP);
139
        assert(inet_aton(myIP, &ALTOInfo.localIPAddr) != 0);
140
        start_ALTO_client();
141
        /*set_ALTO_server("http://10.10.251.107/cgi-bin/alto-server.cgi");
142
          set_ALTO_server("http://www.napa-wine-alto.eu/cgi-bin/alto-server.cgi");*/
143
        fprintf(stderr,"Setting ALTO server URL: '%s'\n", g_config.alto_server);
144
        set_ALTO_server(g_config.alto_server);
145

    
146
        srand(time(NULL));
147
}
148

    
149
void topologyShutdown(void)
150
{
151
  stop_ALTO_client();
152
  config_free();
153
}
154

    
155
/*
156
  The main peer selector function.
157
*/
158
void PeerSelectorALTO(void)
159
{
160
        int p, i, j, rescode;
161
        struct timeval tnow;
162
        uint64_t timenow;
163

    
164
        gettimeofday(&tnow, NULL);
165
        timenow = (tnow.tv_usec + tnow.tv_sec * 1000000ull);
166
        if ((timenow - currtime > (g_config.update_interval * 1000000)) &&
167
                        (newAltoResults)) {
168

    
169
                /* fill the temporary peer list (used for ALTO query) */
170
                for (p=0; p < c_neigh_size; p++) {
171
                        /* copy/convert nodeID IP address into ALTO struct */
172
                        node_convert_addr(currentNeighborhood[p], &ALTOInfo.peers[p].alto_host);
173

    
174
                        ALTOInfo.peers[p].prefix = 32;
175
                        ALTOInfo.peers[p].rating = -1; /* init to dummy rating */
176
                }
177

    
178
                /*****************************************************************/
179
                /*   ALTO query                                                  */
180
                /*****************************************************************/
181
                gettimeofday(&tnow, NULL);
182
                timenow = (tnow.tv_usec + tnow.tv_sec * 1000000ull);
183
                fprintf(stderr,"Calling ALTO server at : %u\n",(unsigned int)(((unsigned long)timenow)/1000000)); //
184
                rescode = ALTO_query_exec( // get_ALTO_guidance_for_list(
185
                                ALTOInfo.peers,
186
                                c_neigh_size,
187
                                ALTOInfo.localIPAddr,
188
                                g_config.alto_pri_criterion,
189
                                g_config.alto_sec_criterion
190
                );
191

    
192
                /*assert(rescode == 1);*/
193
                if (rescode != 1) {
194
                        fprintf(stderr,"WARNING: ALTO query FAILED!\n");
195
                        newAltoResults = 1;
196
                }
197
                else {
198
                        newAltoResults = 0;
199
                }
200
        }
201

    
202
        if (ALTO_query_state() == ALTO_QUERY_READY) {
203

    
204
                if (!newAltoResults) {
205
                        gettimeofday(&tnow, NULL);
206
                        currtime = (tnow.tv_usec + tnow.tv_sec * 1000000ull);
207
                        fprintf(stderr,"Received ALTO server reply at : %u\n",(unsigned int)(((unsigned long)currtime)/1000000));
208

    
209
                        /* Build sorted list of ALTO-rated peers */
210
                        qsort(ALTOInfo.peers, c_neigh_size, sizeof(ALTO_GUIDANCE_T), qsort_compare_ALTO_ranking);
211

    
212
#ifdef LOG_ALTO_RATINGS
213
                        dprintf("\nSorted ALTO ratings:\n");
214
                        for (p=0; p < c_neigh_size; p++) {
215
                                dprintf("ALTO peer %d: rating = %d\n ", p, ALTOInfo.peers[p].rating);
216
                        }
217
#endif
218

    
219
                        /*
220
    now build the disjoint buckets of peers:
221

222
    we pick a certain percentage of the best ALTO-rated peers
223
    and fill the rest with random neighbours
224
                         */
225

    
226
                        for (i=0; i < altoList_size; i++) {
227
                                nodeid_free(altoList[i]);
228
                        }
229

    
230
                        if ((NEIGHBORHOOD_TARGET_SIZE > 0) && (NEIGHBORHOOD_TARGET_SIZE < c_neigh_size)) {
231
                                altoList_size = NEIGHBORHOOD_TARGET_SIZE;
232
                        } else {
233
                                altoList_size = c_neigh_size;
234
                        }
235
                        ALTO_bucket_size = altoList_size * g_config.alto_factor;
236
                        RAND_bucket_size = altoList_size - ALTO_bucket_size;
237

    
238
                        altoList = realloc(altoList,altoList_size*sizeof(struct nodeID *));
239
                        altoList = memset(altoList,0,altoList_size*sizeof(struct nodeID *));
240

    
241
                        /* add ALTO peers */
242
                        fprintf(stderr,"\nSorted ALTO peers:\n");
243
                        for (i = 0; i < ALTO_bucket_size; i++) {
244
                                altoList[i] = findALTOPeerInNeighbourList(currentNeighborhood, c_neigh_size, i);
245

    
246
                                fprintf(stderr,"ALTO peer %d: id  = %s ; rating = %d\n ", (i+1), node_addr(altoList[i]), ALTOInfo.peers[i].rating);
247
                        }
248

    
249
                /* add remaining peers randomly */
250
                fprintf(stderr,"\nMore ALTO randomly picked peers:\n");
251
                for (j = ALTO_bucket_size; j < ALTO_bucket_size + RAND_bucket_size; j++) {
252
                        do { // FIXME: it works only if gossipNeighborhood is realloc'ed for sure between two queries...
253
                                p = rand() % c_neigh_size;
254
                        } while (!currentNeighborhood[p]);
255

    
256
                        altoList[j] = currentNeighborhood[p];
257
                        currentNeighborhood[p] = NULL;
258
                        fprintf(stderr,"ALTO peer %d: id  = %s\n ", (j+1), node_addr(altoList[j]));
259
                }
260
                newAltoResults = 1;
261
        }
262
        }
263

    
264
}
265

    
266
/* <-- ALTO end */
267

    
268
static void topoAddToBL (struct nodeID *id)
269
{
270
        topAddToBlackList(id);
271
}
272

    
273
int topoAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
274
{
275
        return topAddNeighbour(neighbour,metadata,metadata_size);
276
}
277

    
278
int topologyInit(struct nodeID *myID, const char *config)
279
{
280
        me = myID;
281
        bind_msg_type(mTypes[0]);
282
        config_init();
283
        config_load("streamer.conf");
284
        //config_dump();
285
        topo_ALTO_init(node_ip(myID));
286
        return (topInit(myID, NULL, 0, config));
287
}
288

    
289

    
290
void update_peers(struct nodeID *from, const uint8_t *buff, int len)
291
{
292
        int i,j,k,npeers,doAdd,psize;
293
        struct peer *peers; static const struct nodeID **nbrs;
294
        struct timeval told,tnow;
295

    
296
        psize = peerset_size(pset);
297
        dprintf("before:%d, ",psize);
298
        topParseData(buff, len);
299
        nbrs = topGetNeighbourhood(&npeers);
300
        if (newAltoResults) {
301
                for (i=0; i < c_neigh_size; i++) {
302
                        nodeid_free(currentNeighborhood[i]);
303
                }
304
                free(currentNeighborhood);
305
                c_neigh_size = npeers + psize;
306
                currentNeighborhood = calloc(c_neigh_size,sizeof(struct nodeID *));
307
                for (i=0; i < psize; i++) {
308
                        currentNeighborhood[i] = nodeid_dup((peerset_get_peers(pset)[i]).id);
309
                }
310
                for (j=0; j < npeers; j++) {
311
                        if (peerset_check(pset, nbrs[j]) < 0) {
312
                                currentNeighborhood[i] = nodeid_dup(nbrs[j]);
313
                                i++;
314
                        }
315
                }
316
                c_neigh_size = i;
317
                currentNeighborhood = i?realloc(currentNeighborhood,c_neigh_size * sizeof(struct nodeID *)):NULL;
318
        }
319
        if (c_neigh_size > 1) {
320
                PeerSelectorALTO();
321
        }
322

    
323
        if (ALTO_query_state() == ALTO_QUERY_READY && newAltoResults==1) {
324
        //        fprintf(stderr,"Composing peerset from ALTO selection\n");
325
                for (i=0; i < altoList_size; i++) { /* first goal : add all new ALTO-ranked peers
326
                                                                                        second goal : do not increase current peerset size
327
                                                                                        third goal : make space by deleting only peers that
328
                                                                                                are not in the current ALTO-reply */
329
                        if (peerset_check(pset, altoList[i]) < 0) {
330
                                doAdd = 1;
331
                                if (NEIGHBORHOOD_TARGET_SIZE &&
332
                                        peerset_size(pset) >= NEIGHBORHOOD_TARGET_SIZE) {
333
                                        doAdd = 0;
334
                                        for (j = 0; j < peerset_size(pset); j++) {
335
                                                for (k = 0; k < altoList_size; k++) {
336
                                                        if (k!=i && nodeid_equal(peerset_get_peers(pset)[j].id, altoList[k])) {
337
                                                                break;
338
                                                        }
339
                                                }
340
                                                if (k == altoList_size) {
341
                                                        remove_peer(peerset_get_peers(pset)[j].id);
342
                                                        doAdd = 1;
343
                                                        break; // without this, peerset size would possibly decrease
344
                                                }
345
                                        }
346
                                }
347
                                if (doAdd) {
348
                                        add_peer(altoList[i]);
349
                                }
350
                        }
351
                }
352
                newAltoResults = 2;
353
        }
354
        if (peerset_size(pset) < NEIGHBORHOOD_TARGET_SIZE) { // ALTO selection didn't fill the peerset
355
                //        fprintf(stderr,"Composing peerset from ncast neighborhood\n");
356
                for (i=0;i<npeers;i++) {
357
                        if(peerset_check(pset, nbrs[i]) < 0) {
358
                                if (!NEIGHBORHOOD_TARGET_SIZE || peerset_size(pset) < NEIGHBORHOOD_TARGET_SIZE) {
359
                                        add_peer(nbrs[i]);
360
                                }
361
                        }
362
                }
363
        }
364
        psize = peerset_size(pset);
365
        dprintf("after:%d, ",psize);
366

    
367
        gettimeofday(&tnow, NULL);
368
        timersub(&tnow, &tout_bmap, &told);
369
        peers = peerset_get_peers(pset);
370

    
371
        if (LOG_EVERY && ++cnt % LOG_EVERY == 0) {
372
                fprintf(stderr,"\nMy peerset : size = %d\n",psize);
373
                for (i=0;i<psize;i++) {
374
                        fprintf(stderr,"\t%d : %s\n",i,node_addr(peers[i].id));
375
                }
376
        }
377

    
378
        for (i = 0; i < peerset_size(pset); i++) {
379
                if ( (!timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].creation_timestamp, &told, <) ) ||
380
                                ( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <)     )   ) {
381
                        //if (peerset_size(pset) > 1) {        // avoid dropping our last link to the world
382
                        topoAddToBL(peers[i].id);
383
                        remove_peer(peers[i--].id);
384
                        //}
385
                }
386
        }
387

    
388
        reg_neigh_size(peerset_size(pset));
389

    
390
        dprintf("after timer check:%d\n",peerset_size(pset));
391
}
392

    
393
struct peer *nodeid_to_peer(const struct nodeID* id, int reg)
394
{
395
        struct peer *p = peerset_get_peer(pset, id);
396
        if (!p) {
397
                //fprintf(stderr,"warning: received message from unknown peer: %s!\n",node_addr(id));
398
                if (reg && peerset_size(pset) < NEIGHBORHOOD_TARGET_SIZE) {
399
        //      topoAddNeighbour(id, NULL, 0);        //@TODO: this is agressive
400
                        add_peer(id);
401
                        p = peerset_get_peer(pset,id);
402
                }
403
        }
404

    
405
        return p;
406
}
407

    
408
int peers_init()
409
{
410
        fprintf(stderr,"peers_init\n");
411
        pset = peerset_init(0);
412
        return pset ? 1 : 0;
413
}
414

    
415
struct peerset *get_peers()
416
{
417
        return pset;
418
}