Statistics
| Branch: | Revision:

streamers / topology-ALTO.c @ b2c34e56

History | View | Annotate | Download (10.8 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *  Copyright (c) 2010 Luca Abeni
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <stdint.h>
8
#include <stdio.h>
9
#include <sys/time.h>
10
#include <time.h>
11
#include <assert.h>
12
#include <string.h>
13
#include <stdlib.h> /* qsort, rand */
14

    
15
#include <net_helper.h>
16
#include <peerset.h>
17
#include <peer.h>
18
#include <grapes_msg_types.h>
19
#include <topmanager.h>
20
#include <ALTOclient.h>
21
#include <pthread.h>
22

    
23
#include "net_helpers.h"        /* default_ip_addr() */
24
#include "topology.h"
25
#include "streaming.h"
26
#include "dbg.h"
27
#include "measures.h"
28
#include "config.h"
29

    
30
static int NEIGHBORHOOD_TARGET_SIZE;
31

    
32
static struct peerset *pset;
33
static struct timeval tout_bmap = {10, 0};
34

    
35
static int cnt = 0;
36
static struct nodeID *me;
37
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY};
38
static uint64_t currtime;
39
static struct nodeID **altoList;
40
static int altoList_size, g_neigh_size, newAltoResults=2;
41
static struct nodeID **gossipNeighborhood;
42

    
43
/* ALTO begin --> */
44
#define ALTO_MAX_PEERS        1024
45
static int ALTO_bucket_size, RAND_bucket_size;
46

    
47
/* work struct */
48
static struct tagALTOInfo {
49
  struct in_addr localIPAddr;
50
  ALTO_GUIDANCE_T peers[ALTO_MAX_PEERS];
51
} ALTOInfo;
52

    
53
/* input to the peer selection function */
54
typedef struct {
55
  const struct nodeID **neighbours;
56
  int numNeighb;
57
  unsigned int pri_crit;        /* primary rating criterion */
58
  unsigned int sec_crit;        /* secondary rating criterion flags */
59
  float altoFactor;                        /* percentage of ALTO selected peers in disjoint bucket */
60
  /* jahanpanah@neclab.eu */
61
} ALTOInput_t;
62
/* <-- ALTO end */
63

    
64
void add_peer(struct nodeID *id)
65
{
66
      dprintf("Adding %s to neighbourhood!\n", node_addr(id));
67
      peerset_add_peer(pset, id);
68
      /* add measures here */
69
      add_measures(id);
70
      send_bmap(peerset_get_peer(pset,id));
71
}
72

    
73
void remove_peer(struct nodeID *id)
74
{
75
      dprintf("Removing %s from neighbourhood!\n", node_addr(id));
76
      /* add measures here */
77
      delete_measures(id);
78
      peerset_remove_peer(pset, id);
79
}
80

    
81
/* ALTO begin --> */
82

    
83
/**
84
 * As nodeID is an opaque type not exposing its in_addr, we need to convert the
85
 * IP string returned by node_addr() via inet_aton().
86
 */
87
static void node_convert_addr(const struct nodeID* node, struct in_addr* addr) {
88
  int len, rescode;
89
  char peerIP[64];
90
  const char* peerAddr = node_addr(node);
91

    
92
  /* extract IP string without port */
93
  const char* tmp = strstr(peerAddr, ":");
94
  if (tmp) len = tmp-peerAddr;        /* calc len of IP string */
95
  else len = strlen(peerAddr)+1;
96
  memcpy(peerIP, peerAddr, len);
97
  peerIP[len] = 0;        /* trailing zero */
98

    
99
  /*dprintf("%d. peer addr = %s IP = %s\n", p, peerAddr, peerIP);*/
100

    
101
  /* fill in IP addr */
102
  rescode = inet_aton(peerIP, addr);
103
  assert(rescode != 0);
104
}
105

    
106
/*
107
  for a given address find corresponding entry in unsorted
108
  neighbour list provided by streamer
109
*/
110
static const struct nodeID* findALTOPeerInNeighbourList(const struct nodeID **neighbours, int numNeighb, size_t altoEntry)
111
{
112
  int p; struct nodeID *res;
113
  struct in_addr addr;
114

    
115
  for (p=0; p < numNeighb; p++) {
116
    if (!neighbours[p]) continue;
117
    node_convert_addr(neighbours[p], &addr);
118
    if (addr.s_addr == ALTOInfo.peers[altoEntry].alto_host.s_addr) {
119
      /* we found the entry */
120
                res = neighbours[p];
121
                neighbours[p] = NULL;
122
      return res;
123
        }
124
  }
125
  return NULL; /* not found */
126
}
127

    
128
/** qsort comparator function for ALTO peer rankings */
129
static int qsort_compare_ALTO_ranking(const void* a, const void* b)
130
{
131
  const ALTO_GUIDANCE_T* r1 = (const ALTO_GUIDANCE_T*) a;
132
  const ALTO_GUIDANCE_T* r2 = (const ALTO_GUIDANCE_T*) b;
133
  return r1->rating - r2->rating;
134
}
135

    
136
static void topo_ALTO_init(void)
137
{
138
        char* myIP;
139

    
140
        NEIGHBORHOOD_TARGET_SIZE = g_config.neighborhood_target_size;
141

    
142
        myIP = default_ip_addr();
143
        dprintf("myIP: %s\n", myIP);
144
        assert(inet_aton(myIP, &ALTOInfo.localIPAddr) != 0);
145
        start_ALTO_client();
146
        /*set_ALTO_server("http://10.10.251.107/cgi-bin/alto-server.cgi");
147
          set_ALTO_server("http://www.napa-wine-alto.eu/cgi-bin/alto-server.cgi");*/
148
        fprintf(stderr,"Setting ALTO server URL: '%s'\n", g_config.alto_server);
149
        set_ALTO_server(g_config.alto_server);
150

    
151
        srand(time(NULL));
152
}
153

    
154
void topologyShutdown(void)
155
{
156
  stop_ALTO_client();
157
  config_free();
158
}
159

    
160
/*
161
  The main peer selector function.
162
*/
163
void PeerSelectorALTO(void)
164
{
165
        int p, i, j, rescode;
166
        struct timeval tnow;
167
        uint64_t timenow;
168

    
169
        gettimeofday(&tnow, NULL);
170
        timenow = (tnow.tv_usec + tnow.tv_sec * 1000000ull);
171
        if ((timenow - currtime > (g_config.update_interval * 1000000)) &&
172
                        (newAltoResults)) {
173

    
174
                /* fill the temporary peer list (used for ALTO query) */
175
                for (p=0; p < g_neigh_size; p++) {
176
                        /* copy/convert nodeID IP address into ALTO struct */
177
                        node_convert_addr(gossipNeighborhood[p], &ALTOInfo.peers[p].alto_host);
178

    
179
                        ALTOInfo.peers[p].prefix = 32;
180
                        ALTOInfo.peers[p].rating = -1; /* init to dummy rating */
181
                }
182

    
183
                /*****************************************************************/
184
                /*   ALTO query                                                  */
185
                /*****************************************************************/
186
                gettimeofday(&tnow, NULL);
187
                timenow = (tnow.tv_usec + tnow.tv_sec * 1000000ull);
188
                fprintf(stderr,"Calling ALTO server at : %u\n",(unsigned int)(((unsigned long)timenow)/1000000)); //
189
                rescode = ALTO_query_exec( // get_ALTO_guidance_for_list(
190
                                ALTOInfo.peers,
191
                                g_neigh_size,
192
                                ALTOInfo.localIPAddr,
193
                                g_config.alto_pri_criterion,
194
                                g_config.alto_sec_criterion
195
                );
196

    
197
                /*assert(rescode == 1);*/
198
                if (rescode != 1) {
199
                        fprintf(stderr,"WARNING: ALTO query FAILED!\n");
200
                        newAltoResults = 1;
201
                }
202
                else {
203
                        newAltoResults = 0;
204
                }
205
        }
206

    
207
        if (ALTO_query_state() == ALTO_QUERY_READY) {
208

    
209
                if (!newAltoResults) {
210
                        gettimeofday(&tnow, NULL);
211
                        currtime = (tnow.tv_usec + tnow.tv_sec * 1000000ull);
212
                        fprintf(stderr,"Received ALTO server reply at : %u\n",(unsigned int)(((unsigned long)currtime)/1000000));
213

    
214
                        /* Build sorted list of ALTO-rated peers */
215
                        qsort(ALTOInfo.peers, g_neigh_size, sizeof(ALTO_GUIDANCE_T), qsort_compare_ALTO_ranking);
216

    
217
#ifdef LOG_ALTO_RATINGS
218
                        dprintf("\nSorted ALTO ratings:\n");
219
                        for (p=0; p < g_neigh_size; p++) {
220
                                dprintf("ALTO peer %d: rating = %d\n ", p, ALTOInfo.peers[p].rating);
221
                        }
222
#endif
223

    
224
                        /*
225
    now build the disjoint buckets of peers:
226

227
    we pick a certain percentage of the best ALTO-rated peers
228
    and fill the rest with random neighbours
229
                         */
230

    
231
                        if ((NEIGHBORHOOD_TARGET_SIZE > 0) && (NEIGHBORHOOD_TARGET_SIZE < g_neigh_size)) {
232
                                ALTO_bucket_size = NEIGHBORHOOD_TARGET_SIZE * g_config.alto_factor;
233
                                RAND_bucket_size = NEIGHBORHOOD_TARGET_SIZE - ALTO_bucket_size;
234
                        } else {
235
                                ALTO_bucket_size = g_neigh_size * g_config.alto_factor;
236
                                RAND_bucket_size = g_neigh_size - ALTO_bucket_size;
237
                        }
238

    
239
                        for (i=0; i < altoList_size; i++) {
240
                                nodeid_free(altoList[i]);
241
                        }
242

    
243
                        altoList = realloc(altoList,g_neigh_size*sizeof(struct nodeID *));
244
                        altoList = memset(altoList,0,g_neigh_size*sizeof(struct nodeID *));
245
                        altoList_size = g_neigh_size;
246

    
247
                        /* add ALTO peers */
248
                        fprintf(stderr,"\nSorted ALTO peers:\n");
249
                        for (i = 0; i < ALTO_bucket_size; i++) {
250
                                altoList[i] = findALTOPeerInNeighbourList(gossipNeighborhood, g_neigh_size, i);
251

    
252
                                fprintf(stderr,"ALTO peer %d: id  = %s ; rating = %d\n ", (i+1), node_addr(altoList[i]), ALTOInfo.peers[i].rating);
253
                        }
254

    
255
                /* add remaining peers randomly */
256
                fprintf(stderr,"\nMore ALTO randomly picked peers:\n");
257
                for (j = ALTO_bucket_size; j < ALTO_bucket_size + RAND_bucket_size; j++) {
258
                        do { // FIXME: it works only if gossipNeighborhood is realloc'ed for sure between two queries...
259
                                p = rand() % g_neigh_size;
260
                        } while (!gossipNeighborhood[p]);
261

    
262
                        altoList[j] = gossipNeighborhood[p];
263
                        gossipNeighborhood[p] = NULL;
264
                        fprintf(stderr,"ALTO peer %d: id  = %s\n ", (j+1), node_addr(altoList[j]));
265
                }
266
                newAltoResults = 1;
267
        }
268
        }
269

    
270
}
271

    
272
/* <-- ALTO end */
273

    
274
int topoAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
275
{
276
        return topAddNeighbour(neighbour,metadata,metadata_size);
277
}
278

    
279
int topologyInit(struct nodeID *myID, const char *config)
280
{
281
        me = myID;
282
        bind_msg_type(mTypes[0]);
283
        config_init();
284
        config_load("streamer.conf");
285
        //config_dump();
286
        topo_ALTO_init();
287
        return (topInit(myID, NULL, 0, config));
288
}
289

    
290
// currently it just makes the peerset grow
291
void update_peers(struct nodeID *from, const uint8_t *buff, int len)
292
{
293
        int i,npeers;
294
        struct peer *peers; static const struct nodeID **nbrs;
295
        struct timeval told,tnow;
296

    
297
        dprintf("before:%d, ",peerset_size(pset));
298
        topParseData(buff, len);
299
        nbrs = topGetNeighbourhood(&npeers);
300
        if (newAltoResults) {
301
                for (i=0; i < g_neigh_size; i++) {
302
                        nodeid_free(gossipNeighborhood[i]);
303
                }
304
                free(gossipNeighborhood);
305
                g_neigh_size = npeers;
306
                gossipNeighborhood = calloc(g_neigh_size,sizeof(struct nodeID *));
307
                for (i=0; i < g_neigh_size; i++) {
308
                        gossipNeighborhood[i] = nodeid_dup(nbrs[i]);
309
                }
310
        }
311
        if (g_neigh_size > 1) {
312
                PeerSelectorALTO();
313
        }
314

    
315
        if (ALTO_query_state() == ALTO_QUERY_READY && newAltoResults==1) {
316
        //        fprintf(stderr,"Composing peerset from ALTO selection\n");
317
        for (i=0; i<altoList_size; i++) {
318
                if(peerset_check(pset, altoList[i]) < 0) {
319
                        if (!NEIGHBORHOOD_TARGET_SIZE || peerset_size(pset) < NEIGHBORHOOD_TARGET_SIZE) {
320
                                add_peer(altoList[i]);
321
                        }
322
                }
323
        }
324
        newAltoResults = 2;
325
        }
326
        if (peerset_size(pset) < NEIGHBORHOOD_TARGET_SIZE) { // ALTO selection didn't fill the peerset
327
                //        fprintf(stderr,"Composing peerset from ncast neighborhood\n");
328
                for (i=0;i<g_neigh_size;i++) {
329
                        if(gossipNeighborhood[i] && peerset_check(pset, gossipNeighborhood[i]) < 0) {
330
                                if (!NEIGHBORHOOD_TARGET_SIZE || peerset_size(pset) < NEIGHBORHOOD_TARGET_SIZE) {
331
                                        add_peer(gossipNeighborhood[i]);
332
                                }
333
                        }
334
                }
335
        }
336
        dprintf("after:%d, ",peerset_size(pset));
337

    
338
        gettimeofday(&tnow, NULL);
339
        timersub(&tnow, &tout_bmap, &told);
340
        peers = peerset_get_peers(pset);
341

    
342
        if (++cnt % 10 == 0) {
343
                fprintf(stderr,"\nMy peerset : size = %d\n",peerset_size(pset));
344
                for (i=0;i<peerset_size(pset);i++) {
345
                        fprintf(stderr,"\t%d : %s\n",i,node_addr(peers[i].id));
346
                }
347
        }
348

    
349
        for (i = 0; i < peerset_size(pset); i++) {
350
                if ( (!timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].creation_timestamp, &told, <) ) ||
351
                                ( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <)     )   ) {
352
                        //if (peerset_size(pset) > 1) {        // avoid dropping our last link to the world
353
                        topRemoveNeighbour(peers[i].id);
354
                        remove_peer(peers[i--].id);
355
                        //}
356
                }
357
        }
358

    
359
        reg_neigh_size(peerset_size(pset));
360

    
361
        dprintf("after timer check:%d\n",peerset_size(pset));
362
}
363

    
364
struct peer *nodeid_to_peer(const struct nodeID* id, int reg)
365
{
366
        struct peer *p = peerset_get_peer(pset, id);
367
        if (!p) {
368
                fprintf(stderr,"warning: received message from unknown peer: %s!\n",node_addr(id));
369
                if (reg) {
370
        //      topoAddNeighbour(id, NULL, 0);        //@TODO: this is agressive
371
                        add_peer(id);
372
                        p = peerset_get_peer(pset,id);
373
                }
374
        }
375

    
376
        return p;
377
}
378

    
379
int peers_init()
380
{
381
        fprintf(stderr,"peers_init\n");
382
        pset = peerset_init(0);
383
        return pset ? 1 : 0;
384
}
385

    
386
struct peerset *get_peers()
387
{
388
        return pset;
389
}