Revision 9176d3d1

View differences:

loop-mt.c
63 63
    }
64 64
    switch (buff[0] /* Message Type */) {
65 65
	  case MSG_TYPE_TMAN:
66
      case MSG_TYPE_STREAMER_TOPOLOGY:
66 67
      case MSG_TYPE_TOPOLOGY:
67 68
        pthread_mutex_lock(&topology_mutex);
68 69
        update_peers(remote, buff, len);
......
101 102
    dprintf("Received message (%c) from %s\n", buff[0], node_addr(remote));
102 103
    switch (buff[0] /* Message Type */) {
103 104
	  case MSG_TYPE_TMAN:
105
      case MSG_TYPE_STREAMER_TOPOLOGY:
104 106
      case MSG_TYPE_TOPOLOGY:
105 107
        pthread_mutex_lock(&topology_mutex);
106 108
        update_peers(remote, buff, len);
loop.c
79 79
      }
80 80
      switch (buff[0] /* Message Type */) {
81 81
        case MSG_TYPE_TMAN:
82
        case MSG_TYPE_STREAMER_TOPOLOGY:
82 83
        case MSG_TYPE_TOPOLOGY:
83 84
          dtprintf("Topo message received:\n");
84 85
          update_peers(remote, buff, len);
......
157 158
      dprintf("Received message (%c) from %s\n", buff[0], node_addr(remote));
158 159
      switch (buff[0] /* Message Type */) {
159 160
		case MSG_TYPE_TMAN:
161
        case MSG_TYPE_STREAMER_TOPOLOGY:
160 162
        case MSG_TYPE_TOPOLOGY:
161 163
          fprintf(stderr, "Top Parse\n");
162 164
#ifdef HTTPIO_MHD
topology.c
41 41
#define TMAN_MAX_IDLE 10
42 42
#define TMAN_LOG_EVERY 1000
43 43

  
44
#define STREAMER_TOPOLOGY_MSG_ADD 0
45
#define STREAMER_TOPOLOGY_MSG_REMOVE 1
46

  
44 47
static struct peerset *pset;
45 48
static struct timeval tout_bmap = {20, 0};
46 49
static int counter = 0;
......
57 60
static struct metadata my_metadata;
58 61
static int cnt = 0;
59 62
static struct nodeID *me = NULL;
60
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY,MSG_TYPE_TMAN};
63
static unsigned char mTypes[] = {MSG_TYPE_TOPOLOGY,MSG_TYPE_TMAN,MSG_TYPE_STREAMER_TOPOLOGY};
61 64
static struct nodeID ** neighbors;
62 65

  
63 66
static void update_metadata(void) {
......
162 165
		topAddToBlackList(id);
163 166
}
164 167

  
165
void add_peer(const struct nodeID *id, const struct metadata *m)
168
//TODO: send metadata as well
169
static int send_topo_msg(struct nodeID *dst, uint8_t type)
170
{
171
  char msg[2];
172
  msg[0] = MSG_TYPE_STREAMER_TOPOLOGY;
173
  msg[1] = type;
174
  return send_to_peer(me, dst, msg, 2);
175
}
176

  
177
static void add_peer_silent(const struct nodeID *id, const struct metadata *m)
166 178
{
167 179
      dprintf("Adding %s to neighbourhood! cb_size:%d\n", node_addr(id), m?m->cb_size:-1);
168 180
      peerset_add_peer(pset, id);
......
173 185
      send_bmap(id);
174 186
}
175 187

  
176
void remove_peer(const struct nodeID *id)
188
static void add_peer(const struct nodeID *id, const struct metadata *m)
189
{
190
      add_peer_silent(id, m);
191
      dtprintf("Topo: explicit topo message sent!!! to %s (peers:%d)\n", node_addr(id));
192
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_ADD);
193
}
194

  
195
static void remove_peer_silent(const struct nodeID *id)
177 196
{
178 197
      dprintf("Removing %s from neighbourhood!\n", node_addr(id));
179 198
      /* add measures here */
......
181 200
      peerset_remove_peer(pset, id);
182 201
}
183 202

  
203
static void remove_peer(const struct nodeID *id)
204
{
205
      remove_peer_silent(id);
206
      dtprintf("Topo: explicit topo message sent!!! to %s (peers:%d)\n", node_addr(id));
207
      send_topo_msg(id, STREAMER_TOPOLOGY_MSG_REMOVE);
208
}
209

  
184 210
//get the rtt. Currenly only MONL version is supported
185 211
static double get_rtt_of(const struct nodeID* n){
186 212
#ifdef MONL
......
250 276
    }
251 277
  }
252 278

  
279
  //handle explicit add/remove messages
280
  if (buff && buff[0] == MSG_TYPE_STREAMER_TOPOLOGY) {
281
    dtprintf(stderr,"Topo: explicit topo message received!!!from %s (peers:%d)\n", node_addr(from), peerset_size(pset));
282
    if (len != 2) {
283
      fprintf(stderr, "Bad streamer topo message received, len:%d!\n", len);
284
      return;
285
    }
286
    switch (buff[1]) {
287
      case STREAMER_TOPOLOGY_MSG_ADD:
288
        ftprintf(stderr,"Topo: adding for symmetry %s (peers:%d)\n", node_addr(from), peerset_size(pset));
289
        add_peer_silent(from, NULL);
290
        break;
291
      case STREAMER_TOPOLOGY_MSG_REMOVE:
292
        ftprintf(stderr,"Topo: removing for symmetry %s (peers:%d)\n", node_addr(from), peerset_size(pset));
293
        remove_peer_silent(from);
294
        break;
295
      default:
296
        fprintf(stderr, "Bad streamer topo message received!\n");
297
    }
298
    reg_neigh_size(peerset_size(pset));
299
    return;
300
  }
301

  
253 302
  topoParseData(buff, len);
254 303

  
255 304
  if (!buff) {
topology.h
10 10

  
11 11
#include <stdint.h>
12 12

  
13
#define MSG_TYPE_STREAMER_TOPOLOGY   0x22
14

  
13 15
int peers_init(void);
14 16
struct peerset *get_peers(void);
15 17
void update_peers(struct nodeID *from, const uint8_t *buff, int len);

Also available in: Unified diff