Statistics
| Branch: | Revision:

napa-baselibs / tests / MonTestDist / MonTestDist.c @ 507372bb

History | View | Annotate | Download (15.5 KB)

1
#include <event2/event.h>
2
#include <unistd.h>
3
#include <string.h>
4
#include <math.h>
5
#include <limits.h>
6

    
7
#include "ml.h"
8
#include "mon.h"
9
#include "napa_log.h"
10
#include "napa.h"
11
#include "repoclient.h"
12

    
13
#define SOCKETID_PUBLISH_NAME "SocketId"
14
#define SOCKETID_PUBLISH_VALUE 0
15

    
16
#define MSG_TYPE 3
17

    
18
char request[] = "APingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPing";
19

    
20
char reply[] = "BPong";
21
//Note first characters identifies Request form Replies and must be maintaned (it is not printed out)
22

    
23
struct peer_info {
24
        MonHandler m_list[15];
25
        int m_list_len;
26
        int con_id;
27
        socketID_handle rem_peer;
28
        struct peer_info *next;
29
        struct peer_info *previous;
30
};
31

    
32
struct peer_info *peers_head = NULL;
33

    
34
char local_id[SOCKETID_STRING_SIZE];
35

    
36
HANDLE repoclient;
37

    
38
//check period in [s]
39
int cycle = 500;
40
char *channel = "MonTestDist";
41

    
42
int avg_pause = 1000; // [ms]
43

    
44
void gen_pause(struct timeval *t) {
45
        /*generate random ipg*/
46
        double p = (double) random() / (double) INT_MAX;
47
        p = -1 * avg_pause * log(1-p) * 1000.0;
48
        t->tv_sec = (unsigned long)p / 1000000;
49
        t->tv_usec = (unsigned long)fmod(p, 100000.0);
50
}
51

    
52
int generic = -1;
53
int sn = 0;
54

    
55

    
56
/* Event to send periodically traffic after the connection has been established */
57
void send_data_cb(int fd, short event,void *arg){
58
        struct peer_info *peer = arg;
59
        struct timeval t;
60

    
61
        if(peer->con_id < 0) {
62
                if(peer->next != NULL)
63
                        peer->next->previous = peer->previous;
64
                if(peer->previous != NULL)
65
                        peer->previous->next = peer->next;
66
                free(peer);
67
                return;
68
        }
69

    
70
        mlSendData(peer->con_id, request, strlen(request) + 1, MSG_TYPE, NULL);
71

    
72
        if(generic >= 0)
73
                monNewSample(generic, sn++);
74

    
75
        gen_pause(&t);
76
        //reschedule
77
        event_base_once(eventbase, -1, EV_TIMEOUT, &send_data_cb, arg, &t);
78
}
79

    
80
/* Called once the connection has been established  */
81
void open_conn_cb_active (int connectionID, void *arg){
82
        char str[SOCKETID_STRING_SIZE];
83
        struct timeval t = {0,0};
84
        struct peer_info *peer = (struct peer_info *) arg;
85
        enum stat_types st[] = {AVG, MIN, MAX};
86
        enum stat_types st2[] = {RATE};
87
        int i=0, ret;
88

    
89
        peer->con_id = connectionID;
90

    
91
        mlSocketIDToString(peer->rem_peer,str, sizeof(str));
92
        info("Opened connection to %s. Starting measures ...",str);
93

    
94

    
95
        /* (Static) List of measures to perform */
96
        /* Notice: Capprobe generates 100 Kbit/s of traffic and Forecaster 1 Mbit/s */
97
        /*         In band measures are taken on the above generated traffic */
98

    
99
        /* HopCount */
100
        ret = peer->m_list[i] = monCreateMeasure(HOPCOUNT, PACKET | IN_BAND);
101
        ret = monPublishStatisticalType(peer->m_list[i], NULL, channel, st , sizeof(st)/sizeof(enum stat_types), repoclient);
102
//        monSetParameter (peer->m_list[i], P_PUBLISHING_RATE, 30);
103
        ret = monSetParameter (peer->m_list[i], P_DEBUG_FILE, 1);
104
        ret = monActivateMeasure(peer->m_list[i], peer->rem_peer, MSG_TYPE);
105
        i++;
106

    
107
        /* Clockdrift and capacity */
108
        ret = peer->m_list[i] = monCreateMeasure(CLOCKDRIFT, PACKET | IN_BAND);
109
        ret = monSetParameter (peer->m_list[i], P_CLOCKDRIFT_ALGORITHM, 1);
110
        ret = monSetParameter (peer->m_list[i], P_DEBUG_FILE, 1);
111
        ret = monActivateMeasure(peer->m_list[i],peer->rem_peer, MSG_TYPE);
112
        i++;
113

    
114
        ret = peer->m_list[i] = monCreateMeasure(CORRECTED_DELAY, PACKET | IN_BAND);
115
        ret = monSetParameter (peer->m_list[i], P_DEBUG_FILE, 1);
116
        ret = monActivateMeasure(peer->m_list[i],peer->rem_peer, MSG_TYPE);
117
        i++;
118

    
119
        ret = peer->m_list[i] = monCreateMeasure(CAPACITY_CAPPROBE, PACKET | IN_BAND);
120
        ret = monSetParameter (peer->m_list[i], P_DEBUG_FILE, 1);
121
        ret = monSetParameter (peer->m_list[i], P_CAPPROBE_DELAY_TH, -1);
122
//         monSetParameter (mh, P_CAPPROBE_PKT_TH, 100);
123
//         monSetParameter (mh, P_CAPPROBE_IPD_TH, 60);
124
//         monPublishStatisticalType(mh, NULL, channel, st , sizeof(st)/sizeof(enum stat_types), repoclient);
125
         ret = monActivateMeasure(peer->m_list[i],peer->rem_peer, MSG_TYPE);
126
        i++;
127

    
128
        ret = peer->m_list[i] = monCreateMeasure(TX_BYTE, PACKET | IN_BAND);
129
        ret = monSetParameter (peer->m_list[i], P_DEBUG_FILE, 1);
130
        ret = monPublishStatisticalType(peer->m_list[i], NULL, channel, st2, sizeof(st2)/sizeof(enum stat_types), repoclient);
131
        ret = monActivateMeasure(peer->m_list[i], peer->rem_peer, MSG_TYPE);
132
        i++;
133

    
134
        ret = peer->m_list[i] = monCreateMeasure(RX_BYTE, PACKET | IN_BAND);
135
        ret = monSetParameter (peer->m_list[i], P_DEBUG_FILE, 1);
136
        ret = monPublishStatisticalType(peer->m_list[i], NULL, channel, st2, sizeof(st2)/sizeof(enum stat_types), repoclient);
137
        ret = monActivateMeasure(peer->m_list[i], peer->rem_peer, MSG_TYPE);
138
        i++;
139

    
140
        /* Seqwin */
141
        ret = peer->m_list[i] = monCreateMeasure(SEQWIN, DATA | IN_BAND);
142
        ret = monSetParameter (peer->m_list[i], P_DEBUG_FILE, 1);
143
//         monSetParameter (peer->m_list[i], P_WINDOW_SIZE, 100);
144
//         monSetParameter (peer->m_list[i], P_PUBLISHING_RATE, 100);
145
//         monPublishStatisticalType(peer->m_list[i], NULL, channel, st , sizeof(st)/sizeof(enum stat_types), repoclient);
146
        ret = monActivateMeasure(peer->m_list[i], peer->rem_peer, MSG_TYPE);
147
        i++;
148

    
149
        /* Loss */
150
        ret = peer->m_list[i] = monCreateMeasure(LOSS, DATA | IN_BAND);
151
        ret = monSetParameter (peer->m_list[i], P_DEBUG_FILE, 1);
152
//         monSetParameter (peer->m_list[i], P_WINDOW_SIZE, 100);
153
//         monSetParameter (peer->m_list[i], P_PUBLISHING_RATE, 100);
154
//         monPublishStatisticalType(peer->m_list[i], NULL, channel, st , sizeof(st)/sizeof(enum stat_types), repoclient);
155
        ret = monActivateMeasure(peer->m_list[i], peer->rem_peer, MSG_TYPE);
156
        i++;
157

    
158
        /* Loss Burst */
159
        ret = peer->m_list[i] = monCreateMeasure(LOSS_BURST, DATA | IN_BAND);
160
        ret = monSetParameter (peer->m_list[i], P_DEBUG_FILE, 1);
161
//        monSetParameter (peer->m_list[i], P_WINDOW_SIZE, 100);
162
//        monSetParameter (peer->m_list[i], P_PUBLISHING_RATE, 100);
163
//        monPublishStatisticalType(peer->m_list[i], NULL, channel, st , sizeof(st)/sizeof(enum stat_types), repoclient);
164
        ret = monActivateMeasure(peer->m_list[i], peer->rem_peer, MSG_TYPE);
165
        i++;
166

    
167
        /* Round trip time */
168
        ret = peer->m_list[i] = monCreateMeasure(RTT, DATA | IN_BAND);
169
//        monSetParameter (peer->m_list[i], P_PUBLISHING_RATE, 100);
170
//        monPublishStatisticalType(peer->m_list[i], NULL, channel, st , sizeof(st)/sizeof(enum stat_types), repoclient);
171
        ret = monSetParameter (peer->m_list[i], P_DEBUG_FILE, 1);
172
        ret = monActivateMeasure(peer->m_list[i], peer->rem_peer, MSG_TYPE);
173
        i++;
174

    
175
        peer->m_list_len = i;
176

    
177
        info("Started %d measures", peer->m_list_len);
178

    
179
        gen_pause(&t);
180

    
181
        /* Start sending traffic */
182
        event_base_once(eventbase, -1, EV_TIMEOUT, &send_data_cb, peer, &t);
183
}
184

    
185
/** Helper to print a string list */
186
const char *print_list(char **list, int n, bool should_free) {
187
        static char buffer[4096];
188
        int i;
189
        buffer[0] = '\0';
190
        for (i = 0; i < n; i++) {
191
                if (i) strcat(buffer, "\n");
192
                strcat(buffer, list[i]);
193
                if (should_free) free(list[i]);
194
        }
195
        if (should_free) free(list);
196
        return buffer;
197
}
198

    
199
void start_measures(char **peers ,int p_len, int should_free) {
200
        char str[SOCKETID_STRING_SIZE];
201
        int i, ret;
202
        send_params sParams;
203
        struct peer_info *t;
204
        enum stat_types st[] = {MIN, MAX};
205

    
206
        generic = monCreateMeasure(GENERIC, 0);
207
        ret = monSetParameter (generic, P_DEBUG_FILE, 1);
208
        ret = monPublishStatisticalType(generic, "GenericTest", channel, st , sizeof(st)/sizeof(enum stat_types), repoclient);
209
        ret = monActivateMeasure(generic, NULL, MSG_TYPE);
210

    
211
        for(i=0; i < p_len; i++) {
212
                if(strcmp(local_id, peers[i]) == 0)
213
                        continue;
214

    
215
                t = (struct peer_info *) malloc(sizeof(struct peer_info));
216
                memset(t,0,sizeof(struct peer_info));
217

    
218
                t->rem_peer = (socketID_handle) malloc(SOCKETID_SIZE);
219
                mlStringToSocketID(peers[i], t->rem_peer);
220

    
221
                if (should_free)
222
                        free(peers[i]);
223

    
224
                mlSocketIDToString(t->rem_peer,str, sizeof(str));
225
                info("Opening connection to %s ...",str);
226
                
227
                mlOpenConnection(t->rem_peer, &open_conn_cb_active, t, sParams);
228

    
229
                t->previous = NULL;
230
                t->next = peers_head;
231
                if(peers_head != NULL)
232
                        peers_head->previous = t;
233
                peers_head = t;
234
        }
235

    
236
        if (should_free)
237
                free(peers);
238
};
239

    
240
void stop_measures(void) {
241
        int i;
242
        struct peer_info *t;
243

    
244
        monDestroyMeasure(generic);
245
        generic = -1;
246
        sn = 0;
247

    
248
        /* Stop any running measurments and free up memory */
249
        while(peers_head != NULL) {
250
                for(i=0; i < peers_head->m_list_len; i++) {
251
                        monDestroyMeasure(peers_head->m_list[i]);
252
                }
253
                info("Stopped %d measures", peers_head->m_list_len);
254

    
255
                free(peers_head->rem_peer);
256

    
257
                mlCloseConnection(peers_head->con_id);
258
                peers_head->con_id = -1;
259

    
260
                peers_head = peers_head->next;
261
        }
262
};
263

    
264
/* Called upon receiving the answer from the repository */
265
void get_peers_cb(HANDLE client, HANDLE id, void *cbarg, char **result, int n) {
266
        //print out list
267
        info("GetPeers done: %d Peers. List:\n%s", n, print_list(result, n, 0));
268
        //start measures
269
        start_measures(result, n, 1);
270
}
271

    
272
/* Event to periodically check for available peers, stop old measurements and start new measurements*/
273
void check_for_new_peers(int fd, short event,void *arg){
274
        // Stop any running measurements
275
        stop_measures();
276

    
277
        // Get peer list
278
        Constraint cons;
279
        cons.published_name = SOCKETID_PUBLISH_NAME;
280
        cons.strValue = NULL;
281
        cons.minValue = SOCKETID_PUBLISH_VALUE;
282
        cons.maxValue = SOCKETID_PUBLISH_VALUE;
283

    
284
        repGetPeers(repoclient, get_peers_cb, NULL, 10, &cons, 1, NULL, 0, channel);
285

    
286
        // reschedule next measurement session
287
        struct timeval tv = {cycle, 0};
288
        event_base_once(eventbase, -1, EV_TIMEOUT, &check_for_new_peers, NULL , &tv);
289
}
290

    
291
void send_reply(int c_id, socketID_handle sock) {
292
        char str[SOCKETID_STRING_SIZE];
293

    
294
        mlSocketIDToString(sock,str, sizeof(str));
295
        info("Sending reply to %s msg_type: %d", str, MSG_TYPE);
296

    
297
        mlSendData(c_id, reply, strlen(reply) + 1, MSG_TYPE, NULL);
298
}
299

    
300
void open_con_cb_passive(int c_id, void *arg) {
301
        char str[SOCKETID_STRING_SIZE];
302

    
303
        mlSocketIDToString(arg, str, sizeof(str));
304
        info("Opened connection with %s", str);
305

    
306
        send_reply(c_id, arg);
307

    
308
        free(arg);
309
}
310

    
311
/* pasive replies to active peer */
312
void rx_data_cb(char *buffer,int buflen, MsgType mt, recv_params *rparam){
313
        char str[SOCKETID_STRING_SIZE];
314

    
315
        mlSocketIDToString(rparam->remote_socketID,str, sizeof(str));
316
        info("Received message from %s on MsgType %d: %s",str ,mt, buffer+1);
317

    
318
        if(buffer[0] == 'A') { //Request?
319
                int c_id = mlConnectionExist(rparam->remote_socketID, false);
320
                if(c_id >= 0) {
321
                        if(mlGetConnectionStatus(c_id))
322
                                send_reply(c_id, rparam->remote_socketID);
323
                } else {
324
                        socketID_handle sock = malloc(SOCKETID_SIZE);
325
                        memcpy(sock, rparam->remote_socketID, SOCKETID_SIZE);
326
                        send_params sParams;
327
                        mlOpenConnection(rparam->remote_socketID, &open_con_cb_passive, sock, sParams);
328
                        info("Waiting to open connection");
329
                }
330
        }
331
}
332

    
333

    
334
/* Called after the ML finished initialising */
335
void receive_local_socketID_cb(socketID_handle local_socketID, int status){
336
        int res, conID;
337
        char str[SOCKETID_STRING_SIZE];
338
        
339
        if(status) {
340
                info("Still trying to do NAT traversal");
341
                return;
342
        }
343

    
344
        info("Nat traversal completed");
345

    
346
        mlSocketIDToString(local_socketID, local_id, sizeof(str));
347
        warn("My local SocketId is: %s", local_id);
348

    
349
        //publish our Id in the repository
350
        MeasurementRecord mr;
351
        mr.originator = mr.targetA = mr.targetB = local_id;
352
        mr.string_value = NULL;
353
        mr.channel = channel;
354
        mr.published_name = SOCKETID_PUBLISH_NAME;
355
        mr.value = SOCKETID_PUBLISH_VALUE;
356
        gettimeofday(&(mr.timestamp), NULL);
357
        
358
        repPublish(repoclient, NULL, NULL, &mr);
359

    
360
        // add check for new peers event to the loop
361
        struct timeval tv = {0 ,0};
362
        event_base_once(eventbase, -1, EV_TIMEOUT, &check_for_new_peers, NULL, &tv);
363
}
364

    
365
/* Called if the connection opening fails */
366
void conn_fail_cb(int connectionID, void *arg){
367
        error("Connection could not be established!\n");
368
}
369

    
370
void usage(char *argv[]) {
371
        printf("Usage:\n\t%s [-b <bindIp>] [-r <repository:port>] [-s <stunserver>] [-v <verbosity>] [-p <port>] [-C <cycle>] [-c <channel>] [-P <avg_pause>]",argv[0]);
372
        exit(1);
373
}
374

    
375
int main(int argc, char *argv[]) {
376
        char *repository = "repository.napa-wine.eu:9832";
377
        char *stun_server = "stun.ekiga.net";
378
        char *bind_ip = NULL;
379
        int verbosity = 100;
380
        int port = 6666;
381

    
382
        int i;
383
        for (i = 1; i < argc; i += 2) {
384
                if (!strcmp("-r", argv[i])) {
385
                        repository = argv[i+1];
386
                }
387
                else if (!strcmp("-s", argv[i])) {
388
                        stun_server = argv[i+1];
389
                }
390
                else if (!strcmp("-b", argv[i])) {
391
                        bind_ip = argv[i+1];
392
                }
393
                else if (!strcmp("-v", argv[i])) {
394
                        verbosity = atoi(argv[i+1]);
395
                }
396
                else if (!strcmp("-p", argv[i])) {
397
                        port = atoi(argv[i+1]);
398
                }
399
                else if (!strcmp("-C", argv[i])) {
400
                        cycle = atoi(argv[i+1]);
401
                }
402
                else if (!strcmp("-c", argv[i])) {
403
                        channel = argv[i+1];
404
                }
405
                else if (!strcmp("-P", argv[i])) {
406
                        avg_pause = atoi(argv[i+1]);
407
                }
408
                else if (!strcmp("-h", argv[i])) {
409
                        usage(argv);
410
                }
411
        }
412

    
413
        printf("Running conf:\nIP:\t\t%s\nSTUN:\t\t%s\nREPO:\t\t%s\nVERBOSITY:\t%d\nPORT:\t\t%d\nCYCLE:\t\t%ds\nCHANNEL:\t%s\nPAUSE:\t\t%dms\n", bind_ip ? bind_ip:"auto", stun_server, repository, verbosity, port, cycle, channel, avg_pause);
414

    
415
        //Init napa: log facility and libevent
416
        napaInit(event_base_new());
417

    
418
        // Initialize logging
419
        napaInitLog(verbosity, NULL, NULL);
420

    
421
        //Init monitoring layer
422
        monInit(eventbase, NULL);
423

    
424
        //Init repoclient
425
        repInit("");
426
        repoclient = repOpen(repository,0);
427
        if (repoclient == NULL) fatal("Unable to initialize repoclient");
428

    
429
        // Init messaging layer
430
        mlRegisterErrorConnectionCb(&conn_fail_cb);
431

    
432
        //register callback
433
        mlRegisterRecvDataCb(&rx_data_cb,MSG_TYPE);
434

    
435
        struct timeval timeout = {3,0};
436
        mlInit(true, timeout, port, bind_ip, 3478, stun_server,
437
                        &receive_local_socketID_cb, (void*)eventbase);
438

    
439
        //Start everything
440
        event_base_dispatch(eventbase);
441
        stop_measures();
442
}