Statistics
| Branch: | Revision:

streamers / topology.c @ da25233b

History | View | Annotate | Download (6.07 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *  Copyright (c) 2010 Luca Abeni
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <stdint.h>
8
#include <stdio.h>
9
#include <sys/time.h>
10
#include <time.h>
11
#include <stdlib.h>
12

    
13
#include <math.h>
14
#include <net_helper.h>
15
#include <peerset.h>
16
#include <peer.h>
17
#include <grapes_msg_types.h>
18
#include <topmanager.h>
19
#include <tman.h>
20

    
21
#include "topology.h"
22
#include "streaming.h"
23
#include "dbg.h"
24
#include "measures.h"
25

    
26
#define NEIGHBORHOOD_TARGET_SIZE 20
27
#define TMAN_MAX_IDLE 10
28
#define TMAN_LOG_EVERY 1000
29

    
30
static struct peerset *pset;
31
static struct timeval tout_bmap = {10, 0};
32
static int counter = 0;
33
static int simpleRanker (const void *tin, const void *p1in, const void *p2in);
34
static tmanRankingFunction rankFunct = simpleRanker;
35
static double my_metadata;
36
static int cnt = 0;
37
static struct nodeID *me = NULL;
38
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY,MSG_TYPE_TMAN};
39
static struct nodeID ** neighbors;
40

    
41
static void update_metadata(void) {
42

    
43
#ifndef MONL
44
        my_metadata = 1 + ceil(((double)rand() / (double)RAND_MAX)*1000);
45
#endif
46
#ifdef MONL
47
        my_metadata = get_receive_delay();
48
#endif
49
}
50

    
51
static int simpleRanker (const void *tin, const void *p1in, const void *p2in) {
52

    
53
        double t,p1,p2;
54
        t = *((const double *)tin);
55
        p1 = *((const double *)p1in);
56
        p2 = *((const double *)p2in);
57

    
58
        if (isnan(t) || (isnan(p1) && isnan(p2))) return 0;
59
        else if (isnan(p1)) return 2;
60
        else if (isnan(p2)) return 1;
61
        else return (fabs(t-p1) == fabs(t-p2))?0:(fabs(t-p1) < fabs(t-p2))?1:2;
62

    
63
}
64

    
65
int topologyInit(struct nodeID *myID, const char *config)
66
{
67
        int i;
68
        for (i=0;i<2;i++)
69
                bind_msg_type(mTypes[i]);
70
        update_metadata();
71
        me = myID;
72
        return (topInit(myID, &my_metadata, sizeof(my_metadata), config) && tmanInit(myID,&my_metadata, sizeof(my_metadata),rankFunct,0));
73
}
74

    
75
void topologyShutdown(void)
76
{
77
}
78

    
79
int topoAddNeighbour(struct nodeID *neighbour, void *metadata, int metadata_size)
80
{
81
        // TODO: check this!! Just to use this function to bootstrap ncast...
82
        if (counter < TMAN_MAX_IDLE)
83
                return topAddNeighbour(neighbour,metadata,metadata_size);
84
        else return tmanAddNeighbour(neighbour,metadata,metadata_size);
85
}
86

    
87
static int topoParseData(const uint8_t *buff, int len)
88
{
89
        int res = -1,ncs = 0,msize;
90
        const struct nodeID **n; const void *m;
91
        if (!buff || buff[0] == MSG_TYPE_TOPOLOGY) {
92
                res = topParseData(buff,len);
93
                if (counter <= TMAN_MAX_IDLE)
94
                        counter++;
95
        }
96
        if (counter >= TMAN_MAX_IDLE && (!buff || buff[0] == MSG_TYPE_TMAN))
97
        {
98
                n = topGetNeighbourhood(&ncs);
99
                if (ncs) {
100
                m = topGetMetadata(&msize);
101
                res = tmanParseData(buff,len,n,ncs,m,msize);
102
                }
103
        }
104
  return res;
105
}
106

    
107
static const struct nodeID **topoGetNeighbourhood(int *n)
108
{
109
        int i; double d;
110
        if (counter > TMAN_MAX_IDLE) {
111
                uint8_t *mdata; int msize;
112
                *n = tmanGetNeighbourhoodSize();
113
                if (neighbors) free(neighbors);
114
                neighbors = calloc(*n,sizeof(struct nodeID *));
115
                tmanGetMetadata(&msize);
116
                mdata = calloc(*n,msize);
117
                tmanGivePeers(*n,neighbors,(void *)mdata);
118

    
119
                if (cnt % TMAN_LOG_EVERY == 0) {
120
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_chunk_delay,%f\n",node_addr(me),node_addr(me),my_metadata);
121
                        for (i=0;i<(*n) && i<NEIGHBORHOOD_TARGET_SIZE;i++) {
122
                                d = *((double *)(mdata+i*msize));
123
                                fprintf(stderr,"abouttopublish,%s,",node_addr(me));
124
                                fprintf(stderr,"%s,,Tman_chunk_delay,%f\n",node_addr(neighbors[i]),d);
125
                        }
126
                        fprintf(stderr,"abouttopublish,%s,%s,,Tman_neighborhood_size,%d\n\n",node_addr(me),node_addr(me),*n);
127
                }
128

    
129
                free(mdata);
130
                return (const struct nodeID **)neighbors;
131
        }
132
        else
133
                return topGetNeighbourhood(n);
134
}
135

    
136
static void topoAddToBL (struct nodeID *id)
137
{
138
        if (counter >= TMAN_MAX_IDLE)
139
                tmanAddToBlackList(id);
140
//        else
141
                topAddToBlackList(id);
142
}
143

    
144
void add_peer(struct nodeID *id)
145
{
146
      dprintf("Adding %s to neighbourhood!\n", node_addr(id));
147
      peerset_add_peer(pset, id);
148
      /* add measures here */
149
      add_measures(id);
150
      send_bmap(peerset_get_peer(pset,id));
151
}
152

    
153
void remove_peer(struct nodeID *id)
154
{
155
      dprintf("Removing %s from neighbourhood!\n", node_addr(id));
156
      /* add measures here */
157
      delete_measures(id);
158
      peerset_remove_peer(pset, id);
159
}
160

    
161
// currently it just makes the peerset grow
162
void update_peers(struct nodeID *from, const uint8_t *buff, int len)
163
{
164
  int n_ids, i;
165
  static const struct nodeID **ids;
166
  struct peer *peers;
167
  struct timeval tnow, told;
168

    
169
//  dprintf("Update peers: topo_msg:%d, ",len);
170
//  if (from) {
171
//    dprintf("from:%s, ",node_addr(from));
172
//    if (peerset_check(pset, from) < 0) {
173
//      topAddNeighbour(from, NULL, 0);        //@TODO: this is agressive
174
//      if (!NEIGHBORHOOD_TARGET_SIZE || peerset_size(pset) < NEIGHBORHOOD_TARGET_SIZE) {
175
//        add_peer(from);
176
//      }
177
//    }
178
//  }
179

    
180
  if (cnt++ % 100 == 0) {
181
        update_metadata();
182
        tmanChangeMetadata(&my_metadata,sizeof(my_metadata));
183
  }
184

    
185
  topoParseData(buff, len);
186
  ids = topoGetNeighbourhood(&n_ids);
187
  for(i = 0; i < n_ids; i++) {
188
    if(peerset_check(pset, ids[i]) < 0) {
189
      if (!NEIGHBORHOOD_TARGET_SIZE || peerset_size(pset) < NEIGHBORHOOD_TARGET_SIZE) {
190
        add_peer(ids[i]);
191
      }
192
    }
193
  }
194

    
195
  gettimeofday(&tnow, NULL);
196
  timersub(&tnow, &tout_bmap, &told);
197
  peers = peerset_get_peers(pset);
198
  for (i = 0; i < peerset_size(pset); i++) {
199
    if ( (!timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].creation_timestamp, &told, <) ) ||
200
         ( timerisset(&peers[i].bmap_timestamp) && timercmp(&peers[i].bmap_timestamp, &told, <)     )   ) {
201
      //if (peerset_size(pset) > 1) {        // avoid dropping our last link to the world
202
                topoAddToBL(peers[i].id);
203
        remove_peer(peers[i--].id);
204
      //}
205
    }
206
  }
207

    
208
  reg_neigh_size(peerset_size(pset));
209
}
210

    
211
struct peer *nodeid_to_peer(const struct nodeID* id, int reg)
212
{
213
  struct peer *p = peerset_get_peer(pset, id);
214
  if (!p) {
215
    //fprintf(stderr,"warning: received message from unknown peer: %s!%s\n",node_addr(id), reg ? " Adding it to pset." : "");
216
    if (reg) {
217
      topoAddNeighbour(id, NULL, 0);
218
      add_peer(id);
219
      p = peerset_get_peer(pset,id);
220
    }
221
  }
222

    
223
  return p;
224
}
225

    
226
int peers_init(void)
227
{
228
  fprintf(stderr,"peers_init\n");
229
  pset = peerset_init(0);
230
  return pset ? 1 : 0;
231
}
232

    
233
struct peerset *get_peers(void)
234
{
235
  return pset;
236
}