Statistics
| Branch: | Revision:

streamers / topology.c @ b88c9f4a

History | View | Annotate | Download (7.13 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *  Copyright (c) 2010 Luca Abeni
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <stdint.h>
8
#include <stdio.h>
9
#include <sys/time.h>
10
#include <time.h>
11
#include <stdlib.h>
12

    
13
#include <math.h>
14
#include <net_helper.h>
15
#include <peerset.h>
16
#include <peer.h>
17
#include <grapes_msg_types.h>
18
#include <topmanager.h>
19
#include <tman.h>
20

    
21
#include "compatibility/timer.h"
22

    
23
#include "topology.h"
24
#include "streaming.h"
25
#include "dbg.h"
26
#include "measures.h"
27

    
28
int NEIGHBORHOOD_TARGET_SIZE = 20;
29
double NEIGHBORHOOD_ROTATE_RATIO = 1.0;
30
#define TMAN_MAX_IDLE 10
31
#define TMAN_LOG_EVERY 1000
32

    
33
static struct peerset *pset;
34
static struct timeval tout_bmap = {0, 0};
35
static int counter = 0;
36
static int simpleRanker (const void *tin, const void *p1in, const void *p2in);
37
static tmanRankingFunction rankFunct = simpleRanker;
38
struct metadata {
39
  int cb_size;
40
  double value;
41
};
42
static struct metadata my_metadata;
43
static int cnt = 0;
44
static struct nodeID *me = NULL;
45
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY,MSG_TYPE_TMAN};
46
static struct nodeID ** neighbors;
47

    
48
static void update_metadata(void) {
49

    
50
        my_metadata.cb_size = am_i_source() ? 0 : get_cb_size();
51
#ifndef MONL
52
        my_metadata.value = 1 + (((double)rand() / (double)RAND_MAX)*1000);
53
#endif
54
#ifdef MONL
55
        my_metadata.value = get_receive_delay();
56
#endif
57
}
58

    
59
static int simpleRanker (const void *tin, const void *p1in, const void *p2in) {
60

    
61
        double t,p1,p2;
62
        t = *((const double *)tin);
63
        p1 = *((const double *)p1in);
64
        p2 = *((const double *)p2in);
65

    
66
        if (isnan(t) || (isnan(p1) && isnan(p2))) return 0;
67
        else if (isnan(p1)) return 2;
68
        else if (isnan(p2)) return 1;
69
        else return (fabs(t-p1) == fabs(t-p2))?0:(fabs(t-p1) < fabs(t-p2))?1:2;
70

    
71
}
72

    
73
int topologyInit(struct nodeID *myID, const char *config)
74
{
75
        int i;
76
        for (i=0;i<2;i++)
77
                bind_msg_type(mTypes[i]);
78
        update_metadata();
79
        me = myID;
80
        return (topInit(myID, &my_metadata, sizeof(my_metadata), config) && tmanInit(myID,&my_metadata, sizeof(my_metadata),rankFunct,0));
81
}
82

    
83
void topologyShutdown(void)
84
{
85
}
86

    
87
int topoAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
88
{
89
        // TODO: check this!! Just to use this function to bootstrap ncast...
90
        struct metadata m = {0};        //TODO: check what metadata option should mean
91

    
92
        if (counter < TMAN_MAX_IDLE)
93
                return topAddNeighbour(neighbour,&m,sizeof(m));
94
        else return tmanAddNeighbour(neighbour,&m,sizeof(m));
95
}
96

    
97
static int topoParseData(const uint8_t *buff, int len)
98
{
99
        int res = -1,ncs = 0,msize;
100
        const struct nodeID **n; const void *m;
101
        if (!buff || buff[0] == MSG_TYPE_TOPOLOGY) {
102
                res = topParseData(buff,len);
103
//                if (counter <= TMAN_MAX_IDLE)
104
//                        counter++;
105
        }
106
        if (counter >= TMAN_MAX_IDLE && (!buff || buff[0] == MSG_TYPE_TMAN))
107
        {
108
                n = topGetNeighbourhood(&ncs);
109
                if (ncs) {
110
                m = topGetMetadata(&msize);
111
                res = tmanParseData(buff,len,n,ncs,m,msize);
112
                }
113
        }
114
  return res;
115
}
116

    
117
static const struct nodeID **topoGetNeighbourhood(int *n)
118
{
119
        int i; double d;
120
        if (counter > TMAN_MAX_IDLE) {
121
                uint8_t *mdata; int msize;
122
                *n = tmanGetNeighbourhoodSize();
123
                if (neighbors) free(neighbors);
124
                neighbors = calloc(*n,sizeof(struct nodeID *));
125
                tmanGetMetadata(&msize);
126
                mdata = calloc(*n,msize);
127
                tmanGivePeers(*n,neighbors,(void *)mdata);
128

    
129
                if (cnt % TMAN_LOG_EVERY == 0) {
130
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_chunk_delay,%f\n",node_addr(me),node_addr(me),my_metadata.value);
131
                        for (i=0;i<(*n) && i<NEIGHBORHOOD_TARGET_SIZE;i++) {
132
                                d = *((double *)(mdata+i*msize));
133
                                fprintf(stderr,"abouttopublish,%s,",node_addr(me));
134
                                fprintf(stderr,"%s,,Tman_chunk_delay,%f\n",node_addr(neighbors[i]),d);
135
                        }
136
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_neighborhood_size,%d\n\n",node_addr(me),node_addr(me),*n);
137
                }
138

    
139
                free(mdata);
140
                return (const struct nodeID **)neighbors;
141
        }
142
        else
143
                return topGetNeighbourhood(n);
144
}
145

    
146
static void topoAddToBL (struct nodeID *id)
147
{
148
        if (counter >= TMAN_MAX_IDLE)
149
                tmanAddToBlackList(id);
150
//        else
151
                topAddToBlackList(id);
152
}
153

    
154
void add_peer(const struct nodeID *id, const struct metadata *m)
155
{
156
      dprintf("Adding %s to neighbourhood! cb_size:%d\n", node_addr(id), m?m->cb_size:-1);
157
      peerset_add_peer(pset, id);
158
      if (m) peerset_get_peer(pset, id)->cb_size = m->cb_size;
159
      /* add measures here */
160
      add_measures(id);
161
      send_bmap(id);
162
}
163

    
164
void remove_peer(struct nodeID *id)
165
{
166
      dprintf("Removing %s from neighbourhood!\n", node_addr(id));
167
      /* add measures here */
168
      delete_measures(id);
169
      peerset_remove_peer(pset, id);
170
}
171

    
172
// currently it just makes the peerset grow
173
void update_peers(struct nodeID *from, const uint8_t *buff, int len)
174
{
175
  int n_ids, metasize, i;
176
  static const struct nodeID **ids;
177
  static const struct metadata *metas;
178
  struct peer *peers;
179
  struct timeval tnow, told;
180

    
181
//  dprintf("Update peers: topo_msg:%d, ",len);
182
//  if (from) {
183
//    dprintf("from:%s, ",node_addr(from));
184
//    if (peerset_check(pset, from) < 0) {
185
//      topAddNeighbour(from, NULL, 0);        //@TODO: this is agressive
186
//      if (!NEIGHBORHOOD_TARGET_SIZE || peerset_size(pset) < NEIGHBORHOOD_TARGET_SIZE) {
187
//        add_peer(from);
188
//      }
189
//    }
190
//  }
191

    
192
  if (cnt++ % 100 == 0) {
193
        update_metadata();
194
    if (counter > TMAN_MAX_IDLE) {
195
        tmanChangeMetadata(&my_metadata,sizeof(my_metadata));
196
    }
197
  }
198

    
199
  topoParseData(buff, len);
200

    
201
  if (!buff) {
202
    reg_neigh_size(peerset_size(pset));
203
    return;
204
  }
205

    
206
  ids = topoGetNeighbourhood(&n_ids);        //TODO handle both tman and topo
207
  metas = topGetMetadata(&metasize);        //TODO: check metasize
208
  for(i = 0; i < n_ids; i++) {
209
    if(peerset_check(pset, ids[i]) < 0) {
210
      if (!NEIGHBORHOOD_TARGET_SIZE || peerset_size(pset) < NEIGHBORHOOD_TARGET_SIZE) {
211
        add_peer(ids[i],&metas[i]);
212
      } else {  //rotate neighbourhood
213
        if (rand()/((double)RAND_MAX + 1) < NEIGHBORHOOD_ROTATE_RATIO) {
214
          add_peer(ids[i],&metas[i]);
215
        }
216
      }
217
    }
218
  }
219

    
220
  if timerisset(&tout_bmap) {
221
    gettimeofday(&tnow, NULL);
222
    timersub(&tnow, &tout_bmap, &told);
223
    peers = peerset_get_peers(pset);
224
    for (i = 0; i < peerset_size(pset); i++) {
225
      if ( (!timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].creation_timestamp, &told, <) ) ||
226
           ( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <)     )   ) {
227
        //if (peerset_size(pset) > 1) {        // avoid dropping our last link to the world
228
        topoAddToBL(peers[i].id);
229
        remove_peer(peers[i--].id);
230
        //}
231
      }
232
    }
233
  }
234

    
235
  while (NEIGHBORHOOD_TARGET_SIZE && peerset_size(pset) > NEIGHBORHOOD_TARGET_SIZE) { //reduce back neighbourhood to target size
236
    peers = peerset_get_peers(pset);
237
    remove_peer(peers[rand() % peerset_size(pset)].id);
238
  }
239

    
240
  reg_neigh_size(peerset_size(pset));
241
}
242

    
243
struct peer *nodeid_to_peer(const struct nodeID* id, int reg)
244
{
245
  struct peer *p = peerset_get_peer(pset, id);
246
  if (!p) {
247
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
248
    if (reg) {
249
      topoAddNeighbour(id, NULL, 0);
250
      add_peer(id,NULL);
251
      p = peerset_get_peer(pset,id);
252
    }
253
  }
254

    
255
  return p;
256
}
257

    
258
int peers_init(void)
259
{
260
  fprintf(stderr,"peers_init\n");
261
  pset = peerset_init(0);
262
  return pset ? 1 : 0;
263
}
264

    
265
struct peerset *get_peers(void)
266
{
267
  return pset;
268
}