Statistics
| Branch: | Revision:

napa-baselibs / tests / nvtest2 / nvtest2.c @ 507372bb

History | View | Annotate | Download (26.4 KB)

1
#include <event2/event.h>
2
#include <unistd.h>
3
#include <string.h>
4
#include <math.h>
5
#include <netinet/in.h>
6
#include <sys/socket.h>
7

    
8
#include "nvtest2.h"
9

    
10
// Defines
11
#define SOCKETID_PUBLISH_NAME "SocketId"
12
#define SOCKETID_PUBLISH_VALUE_MIN 0
13
#define SOCKETID_PUBLISH_VALUE_MAX 1000
14

    
15
// Global variables containing peer information
16
Peer *peer = NULL;
17
int messaging_ready = 0;
18
int active = 0; // initially a peer is passive (before the first remote peer is chosen)
19
int currentConnectionID = -1;
20
bool dummymode = 0; // if dummymode is on, no real traffic is sent
21
int chunkcounter = 0;
22
int chunktoken = 1;
23

    
24
// Global variables containing the remote peer information (to whom requests are sent)
25
socketID_handle remsocketID = NULL; // the remote SocketId
26
char remsocketID_string[SOCKETID_STRING_SIZE] = "Uninitialized"; // the remote SocketId string conterpart
27

    
28
// Messages - Note first characters identifies Request form Replies and must be maintaned - DummyMode ONLY
29
char request[] = "APing";
30
char reply[] = "BPong";
31

    
32
// Structure containing the source stream information
33
struct source_stream {
34
        struct bufferevent *bev;
35
        double chunk_duration;
36
        void *read_buffer;
37
        int read_size;
38
        int chunkid;
39
};
40

    
41
// default send_params
42
send_params sParams;
43

    
44
// Structure for a received chunk
45
struct recievedchunk {
46
        int currentpkt;
47
        int pkts;
48
        char *buffer;
49
        int chunkid;
50
};
51

    
52
// Structure (and instance) containing the playout stream information
53
struct playout_stream {
54
        int udpSocket;
55
        struct sockaddr_in dst;
56
        int read_packet_size;
57
        int palyout_packet_size;
58
        int playout_frequency;
59
        int cycle_buffer;
60
} playout;
61

    
62
// Types for the config file parser
63
cfg_opt_t cfg_stun[] = {
64
        CFG_STR("server", "stun.ekiga.net", CFGF_NONE),
65
        CFG_INT("port", 3478, CFGF_NONE),
66
        CFG_END()
67
};
68

    
69
cfg_opt_t cfg_peer[] = {
70
        CFG_INT("value", 0, CFGF_NONE),
71
        CFG_END()
72
};
73

    
74
cfg_opt_t cfg_ml[] = {
75
        CFG_INT("port", 9000, CFGF_NONE),
76
        CFG_INT("timeout", 3, CFGF_NONE),
77
        CFG_STR("address", NULL, CFGF_NONE),
78
        CFG_SEC("stun", cfg_stun, CFGF_NONE),
79
        CFG_SEC("peerid", cfg_peer, CFGF_NONE),
80
        CFG_END()
81
};
82

    
83
cfg_opt_t cfg_rep[] = {
84
        CFG_STR("server", NULL, CFGF_NONE),
85
        CFG_END()
86
};
87

    
88
cfg_opt_t cfg_nblist[] = {
89
        CFG_INT("desired_size", 10, CFGF_NONE),
90
        CFG_INT("update_period", 15, CFGF_NONE),
91
        CFG_STR("channel", "Nvtest2", CFGF_NONE),
92
        CFG_END()
93
};
94

    
95
cfg_opt_t cfg_log[] = {
96
        CFG_INT("level", LOG_DEBUG, CFGF_NONE),
97
        CFG_END()
98
};
99

    
100
cfg_opt_t cfg_stream[] = {
101
        CFG_STR("source", NULL, CFGF_NONE),
102
        CFG_STR("destination", NULL, CFGF_NONE),
103
        CFG_FLOAT("chunk_duration", 1.0, CFGF_NONE),
104
        CFG_INT("read_packet_size", 1316, CFGF_NONE),
105
        CFG_INT("playout_packet_size", 1316, CFGF_NONE),
106
        CFG_INT("playout_frequency", 200, CFGF_NONE),
107
        CFG_INT("cycle_buffer", 100, CFGF_NONE),
108
        CFG_END()
109
};
110

    
111
cfg_opt_t cfg_main[] = {
112
        CFG_SEC("network", cfg_ml, CFGF_NONE),
113
        CFG_SEC("repository", cfg_rep, CFGF_NONE),
114
        CFG_SEC("neighborlist", cfg_nblist, CFGF_NONE),
115
        CFG_SEC("logging", cfg_log, CFGF_NONE),
116
        CFG_SEC("stream", cfg_stream, CFGF_NONE),
117
        CFG_END()
118
};
119

    
120
// Function prototypes
121
const char *print_list_nblist(NeighborListEntry *list, int n);
122
const char *print_list(char **list, int n, bool should_free);
123
Peer *peer_init(const char *configfile);
124
void init_monitoring(cfg_t *mon_config);
125
void init_repoclient(cfg_t *repo_config);
126
void init_neighborreqs(cfg_t *nb_config);
127
void init_messaging(cfg_t *ml_config);
128
void init_source(cfg_t *source_config);
129
void periodic_nblst_query(HANDLE h, void *arg);
130
void publish_peerid();
131
void publish_connection_req();
132
void choose_remote_peer();
133
void open_conn_cb_active(int connectionID, void *arg);
134
void send_data_cb(int fd, short event, void *arg);
135
void rx_data_stream_cb(char *buffer, int buflen, MsgType mt, recv_params *rparam);
136
void periodic_buffer_playout(int fd, short event, void *arg);
137
void rx_data_cb(char *buffer, int buflen, MsgType mt, recv_params *rparam);
138
void open_con_cb_passive(int c_id, void *arg);
139
void send_reply(int c_id, socketID_handle sock);
140
void conn_fail_cb(int connectionID, void *arg);
141
void receive_local_socketID_cb(socketID_handle local_socketID, int status);
142
void get_peers_cb(HANDLE client, HANDLE id, void *cbarg, char **result, int n);
143
int init_dest_ul(const char *stream, int packet_size, int frequency, int read_size, int cycle_buffer);
144
int init_source_ul(const char *stream, double chunk_duration);
145
void check_conn_reqs();
146
void check_conn_reqs_cb(HANDLE client, HANDLE id, void *cbarg, MeasurementRecord *result, int nResults);
147
void open_conn_cb_reply(int connectionID, void *arg);
148
void read_stream(HANDLE h, void *arg);
149

    
150
// Helper to print a string list of Peers for the periodic update of the Peers list
151
const char *print_list_nblist(NeighborListEntry *list, int n) {
152
        static char buffer[4096];
153
        struct timeval now;
154
        gettimeofday(&now, NULL);
155
        strcpy(buffer, "[ ");
156
        int i;
157
        for (i = 0; i != n; i++) {
158
                if (i) strcat(buffer, ", ");
159
                strcat(buffer, list[i].peer);
160
                char buf[32];
161
                sprintf(buf, "(%d)", (int)(now.tv_sec - list[i].update_timestamp.tv_sec));
162
                strcat(buffer, buf);
163
        }
164
        strcat(buffer, " ]");
165
        return buffer;
166
}
167

    
168
// Helper to print a string list and to store it in the Peer structure
169
const char *print_list(char **list, int n, bool should_free) {
170
        static char buffer[4096];
171
        int i;
172
        for (i = 0; i < n; i++) {
173
                if (i) strcat(buffer, "\n");
174
                strcat(buffer, list[i]);
175
                if (should_free) free(list[i]);
176
        }
177
        if (should_free) free(list);
178
        return buffer;
179
}
180

    
181
// Initialization of a peer
182
Peer *peer_init(const char *configfile) {
183
        peer = calloc(sizeof(Peer), 1);
184

    
185
        // Init napa: log facility and libevent
186
        napaInit(event_base_new());
187
        info("NAPA initialized.");
188

    
189
        // Parse config file */
190
        cfg_t *main_config = cfg_init(cfg_main, CFGF_NONE);
191
        if(cfg_parse(main_config, configfile) == CFG_PARSE_ERROR)
192
                fatal("Unable to parse config file %s!", configfile);
193

    
194
        // Initialize logging/monitoring
195
        cfg_t *logging_cfg = cfg_getsec(main_config, "logging");
196
        if (logging_cfg) init_monitoring(logging_cfg);
197

    
198
        // Initialize repository connection
199
        cfg_t *repoclient_cfg = cfg_getsec(main_config, "repository");
200
        if (repoclient_cfg) init_repoclient(repoclient_cfg);
201

    
202
        // Initialize neighborlist
203
        cfg_t *nblist_cfg = cfg_getsec(main_config, "neighborlist");
204
        if (nblist_cfg) init_neighborreqs(nblist_cfg);
205

    
206
        // Initialize messaging layer
207
        cfg_t *network_cfg = cfg_getsec(main_config, "network");
208
        init_messaging(network_cfg);
209

    
210
        // Initialize connection opener loop for requested connections
211
        struct timeval begin = { 10, 0 };
212
        napaSchedulePeriodic(&begin, 1.0/(double)peer->nblist_update_period, check_conn_reqs, NULL);
213

    
214
        // Initialize stream source and destination
215
        if (!dummymode) {
216
                cfg_t *stream_cfg = cfg_getsec(main_config, "stream");
217
                if (stream_cfg) init_source(stream_cfg);
218
        }
219
        
220
        /* cfg_free(main_config); */
221

    
222
        return peer;
223
}
224

    
225
// Initialize logging/monitoring
226
void init_monitoring(cfg_t *mon_config) {
227
        int verbosity = LOG_DEBUG;
228
        verbosity = cfg_getint(mon_config, "level");
229

    
230
        // Initialize logging
231
        napaInitLog(verbosity, NULL, NULL);
232
        info("Logging initialized with verbosity: %d", verbosity);
233

    
234
        // Init monitoring layer
235
        if (monInit(eventbase, NULL)) fatal("Unable to init monitoring layer!");
236
}
237

    
238
// Initialize repository connection
239
void init_repoclient(cfg_t *repo_config) {
240
        // Init repository client
241
        repInit("");
242
        peer->repository = repOpen(cfg_getstr(repo_config, "server"),0);
243
        if (peer->repository == NULL) fatal("Unable to initialize repository client!");
244
    info("Repository connection initialized to: %s", cfg_getstr(repo_config, "server"));
245
}
246

    
247
// Initialize neighborlist and start periodic update
248
void init_neighborreqs(cfg_t *nb_config) {
249
        peer->nblist_desired_size = cfg_getint(nb_config, "desired_size");
250
        peer->nblist_update_period = cfg_getint(nb_config, "update_period");        
251
        peer->channel = cfg_getstr(nb_config, "channel");
252
        peer->neighborlist = neighborlist_init(peer->repository, peer->nblist_desired_size, peer->nblist_update_period, peer->channel, NULL, NULL);
253

    
254
        struct timeval begin = { 5, 0 };
255
        napaSchedulePeriodic(&begin, 1.0/(double)peer->nblist_update_period, periodic_nblst_query, peer->neighborlist);
256
}
257

    
258
// Initialize messaging layer
259
void init_messaging(cfg_t *ml_config) {
260
        if (!ml_config) return;
261
        struct timeval timeout = { 0, 0 };
262
        timeout.tv_sec = cfg_getint(ml_config, "timeout");
263

    
264
        cfg_t *stun = cfg_getsec(ml_config, "stun");
265
        char *stun_server = cfg_getstr(stun, "server");
266
        int stun_port = cfg_getint(stun, "port");
267

    
268
        cfg_t *peerid = cfg_getsec(ml_config, "peerid");
269
        peer->PeerIDvalue = cfg_getint(peerid, "value");
270

    
271
        char *address = cfg_getstr(ml_config, "address");
272
        if (!address) address = strdup(mlAutodetectIPAddress());
273
        int port = cfg_getint(ml_config, "port");
274

    
275
        if (dummymode) mlRegisterRecvDataCb(&rx_data_cb, 3);
276
        else mlRegisterRecvDataCb(&rx_data_stream_cb, 3);
277
        mlRegisterErrorConnectionCb(&conn_fail_cb);
278

    
279
        info("Calling init_messaging_layer with: %d, %u.%u, (%s, %d), stun(%s, %d)",
280
                1, timeout.tv_sec, timeout.tv_usec, address, port,
281
                stun_server, stun_port);
282
        mlInit(1, timeout, port, address, stun_port, stun_server,
283
                receive_local_socketID_cb, (void*)eventbase);
284

    
285
        while (!messaging_ready) {
286
                napaYield();
287
        }
288
        info("ML has been initialized, with local addess: %s", peer->LocalID);
289
}
290

    
291
// Initialize video source
292
void init_source(cfg_t *str_config) {
293
        char *source = cfg_getstr(str_config, "source");
294
        if (!source) return;
295
        char *destination = cfg_getstr(str_config, "destination");
296
        if (!destination) return;
297
        double chunk_duration = cfg_getfloat(str_config, "chunk_duration");
298
        int read_size = cfg_getint(str_config, "read_packet_size");
299
        int packet_size = cfg_getint(str_config, "playout_packet_size");
300
        int frequency = cfg_getint(str_config, "playout_frequency");
301
        int cycle_buffer = cfg_getint(str_config, "cycle_buffer");
302

    
303
        info("Initializing Destination: tuning to output stream %s [packet size: %d kbytes, frequency: %d]",
304
                        destination, packet_size, frequency);
305
        init_dest_ul(destination, packet_size, frequency, read_size, cycle_buffer);
306

    
307
        info("Initializing Source: tuning to input stream %s [sampling rate: %lf Hz]",
308
                        source, (double)1.0/chunk_duration);
309
        init_source_ul(source, chunk_duration);
310
}
311

    
312
// Getting peer list (storing and printing) - choose the remote peer - publish again the localID
313
void periodic_nblst_query(HANDLE h, void *arg) {
314
        peer->neighborlist = (HANDLE)arg;
315
        peer->neighborlist_size = peer->nblist_desired_size;
316
        peer->neighbors = NULL;
317
        peer->neighbors = calloc(sizeof(NeighborListEntry), peer->neighborlist_size);
318

    
319
        //Periodically publish our ID in the repository
320
        publish_peerid();
321

    
322
        int success = neighborlist_query(peer->neighborlist, peer->neighbors, &peer->neighborlist_size);
323
        if (!success && peer->neighborlist_size > 0) {
324
                // Print the neighbor list if it's not empty
325
                info("GetPeers done: %d Peers currently in the repository. List:\n%s", peer->neighborlist_size, print_list_nblist(peer->neighbors, peer->neighborlist_size));
326
                // Choose a random remote peer from the neighbors list to whom requests will be sent to (if messaging layer is ready)
327
                if (messaging_ready) choose_remote_peer();
328
        }
329
        else
330
                info("GetPeers done: No results for the periodic query!");
331
}
332

    
333
// Publish our Id in the repository
334
void publish_peerid() {
335
        MeasurementRecord mr;
336
        memset(&mr, 0, sizeof(mr));
337
        mr.originator = mr.targetA = mr.targetB = peer->LocalID;
338
        mr.string_value = NULL;
339
        mr.published_name = SOCKETID_PUBLISH_NAME;
340
        mr.value = peer->PeerIDvalue;
341
        mr.channel = peer->channel;
342
        gettimeofday(&(mr.timestamp), NULL);
343

    
344
        repPublish(peer->repository, NULL, NULL, &mr);
345
}
346

    
347
// Publish our connection request in the repository (for the remote peer to see)
348
void publish_connection_req() {
349
        MeasurementRecord mr;
350
        memset(&mr, 0, sizeof(mr));
351
        mr.originator = mr.targetA = peer->LocalID;
352
        mr.targetB = remsocketID_string;
353
        mr.string_value = NULL;
354
        mr.published_name = "ConnReq";
355
        mr.value = 0.0;
356
        mr.channel = peer->channel;
357

    
358
        repPublish(peer->repository, NULL, NULL, &mr);
359
}
360

    
361
// Choose a random remote peer from the neighbors list to whom requests will be sent to
362
void choose_remote_peer() {
363
        // At least two peers need to be present in the repository to be able to make a connection
364
        if (peer->neighborlist_size > 1) {
365
                int random_integer = 0;
366
                srand((unsigned)time(0));
367
                random_integer = rand() % peer->neighborlist_size;
368

    
369
                if (strcmp(remsocketID_string, peer->neighbors[random_integer].peer) != 0) {
370

    
371
                        // Check whether or not self is selected
372
                        if (strcmp(peer->neighbors[random_integer].peer, peer->LocalID) != 0) {
373
                                info("SocketID[%d] is chosen as the remote peer: %s", random_integer, peer->neighbors[random_integer].peer);
374
                                if (active == 0) {
375
                                        // Initialize the active connection to the chosen remote peer
376
                                        active = 1;
377
                                        remsocketID = malloc(SOCKETID_SIZE);
378
                                        mlStringToSocketID(peer->neighbors[random_integer].peer, remsocketID);
379
                                        mlSocketIDToString(remsocketID, remsocketID_string, sizeof(remsocketID_string));
380
                                        info("Opening initial connection to %s ...", remsocketID_string);
381
                                        publish_connection_req();
382
                                        mlOpenConnection(remsocketID, &open_conn_cb_active, NULL, sParams);
383
                                }
384
                                else {
385
                                        // Modify the active connection to the newly chosen remote peer
386
                                        info("Holding current connection to %s [ConnectionID: %d]...", remsocketID_string,currentConnectionID);
387
                                        remsocketID = NULL;
388
                                        remsocketID = malloc(SOCKETID_SIZE);
389
                                        mlStringToSocketID(peer->neighbors[random_integer].peer, remsocketID);
390
                                        mlSocketIDToString(remsocketID, remsocketID_string, sizeof(remsocketID_string));
391
                                        publish_connection_req();
392
                                        mlOpenConnection(remsocketID, &open_conn_cb_active, NULL, sParams);
393
                                }
394
                        }
395
                        else info("Skip modification of remote peer for now. (Self invalid)");
396
                }
397
                else info("Skip modification of remote peer for now. (Same selected)");
398
        }
399
}
400

    
401
// Called once the connection has been established
402
void open_conn_cb_active(int connectionID, void *arg) {
403
        struct timeval t = { 0, 0 };
404
        char measurename[4096];
405
        currentConnectionID = connectionID;
406

    
407
        int *con_id = malloc(sizeof(int));
408
        *con_id = connectionID;
409

    
410
        info("Opened connection to %s with ID %d.", remsocketID_string, connectionID);
411
        info("Start sending requests...");
412

    
413
        int ret;
414
        MonHandler mh;
415
        enum stat_types st[] = { AVG };
416
        /*
417
        mh = monCreateMeasure(HOPCOUNT, TXRX | PACKET | IN_BAND);
418
        sprintf(measurename,"HopCount_peerId-%d_connId-%d", peer->PeerIDvalue, connectionID);
419
        monPublishStatisticalType(mh, measurename, st, sizeof(st)/sizeof(enum stat_types), peer->repository);
420
        ret = monActivateMeasure(mh, remsocketID, 3);
421

422
        mh = monCreateMeasure(CLOCKDRIFT, TXRX | DATA | IN_BAND);
423
        sprintf(measurename,"ClockDrift_peerId-%d_connId-%d", peer->PeerIDvalue, connectionID);
424
        monSetParameter (mh, P_CLOCKDRIFT_ALGORITHM, 1);
425
        monPublishStatisticalType(mh, measurename, st, sizeof(st)/sizeof(enum stat_types), peer->repository);
426
        ret = monActivateMeasure(mh, remsocketID, 12);
427

428
        mh = monCreateMeasure(CORRECTED_DELAY, TXRX | DATA | IN_BAND);
429
        sprintf(measurename,"CorrectedDelay_peerId-%d_connId-%d", peer->PeerIDvalue, connectionID);
430
        monPublishStatisticalType(mh, measurename, st, sizeof(st)/sizeof(enum stat_types), peer->repository);
431
        ret = monActivateMeasure(mh, remsocketID, 12);
432

433
        mh = monCreateMeasure(CAPACITY_CAPPROBE, TXRX | DATA | OUT_OF_BAND);
434
        sprintf(measurename,"Capacity_peerId-%d_connId-%d", peer->PeerIDvalue, connectionID);
435
        monSetParameter (mh, P_CAPPROBE_DELAY_TH, 100);
436
        monSetParameter (mh, P_CAPPROBE_PKT_TH, 100);
437
        monSetParameter (mh, P_CAPPROBE_IPD_TH, 60);
438
        monPublishStatisticalType(mh, measurename, st, sizeof(st)/sizeof(enum stat_types), peer->repository);
439
        ret = monActivateMeasure(mh, remsocketID, 12);
440

441
        mh = monCreateMeasure(AVAILABLE_BW_FORECASTER, TXRX | DATA | OUT_OF_BAND);
442
        sprintf(measurename,"AvailBwForecast_peerId-%d_connId-%d", peer->PeerIDvalue, connectionID);
443
        monSetParameter (mh, P_FORCASTER_PKT_TH, 500);
444
        monSetParameter (mh, P_FORCASTER_DELAY_TH, 100);
445
        monPublishStatisticalType(mh, measurename, st, sizeof(st)/sizeof(enum stat_types), peer->repository);
446
        ret = monActivateMeasure(mh, remsocketID, 12);
447

448
        mh = monCreateMeasure(LOSS_BURST, TXRX | DATA | IN_BAND);
449
        sprintf(measurename,"LossBurst_peerId-%d_connId-%d", peer->PeerIDvalue, connectionID);
450
        monPublishStatisticalType(mh, measurename, st, sizeof(st)/sizeof(enum stat_types), peer->repository);
451
        ret = monActivateMeasure(mh, remsocketID, 12);
452

453
        mh = monCreateMeasure(RTT, TXRX | DATA | IN_BAND);
454
        sprintf(measurename,"RoundTripTime_peerId-%d_connId-%d", peer->PeerIDvalue, connectionID);
455
        monPublishStatisticalType(mh, measurename, st, sizeof(st)/sizeof(enum stat_types), peer->repository);
456
        ret = monActivateMeasure(mh, remsocketID, 12);
457
        */
458

    
459
        if (dummymode) event_base_once(eventbase, -1, EV_TIMEOUT, &send_data_cb, con_id , &t);
460
}
461

    
462
// Event to send periodically dummy-traffic after the connection has been established  - DummyMode ONLY
463
void send_data_cb(int fd, short event, void *arg) {
464
        int *con_id = arg;
465
        struct timeval t = { 1, 0 };
466

    
467
        if (*con_id == currentConnectionID) {
468
                debug("Sending request on ConnectionID: %d", *con_id);
469
                mlSendData(*con_id, request, strlen(request) + 1, 3, NULL);
470
                // Reschedule
471
                event_base_once(eventbase, -1, EV_TIMEOUT, &send_data_cb, arg, &t);
472
        }
473
        else {
474
                info("End sending requests to the previous remote peer on ConnectionID: %d.", *con_id);
475
        }
476
}
477

    
478
// Transfer received messages to the given output UDP stream
479
void rx_data_stream_cb(char *buffer, int buflen, MsgType mt, recv_params *rparam) {
480
        chunkcounter++;
481
        char str[SOCKETID_STRING_SIZE];
482

    
483
        mlSocketIDToString(rparam->remote_socketID, str, sizeof(str));
484
        //debug("Received message [%d] from %s on MsgType %d.", chunkcounter, str ,mt);
485
        info("Received message [%d] from %s on MsgType %d.", chunkcounter, str ,mt);
486

    
487
        int pkts = buflen / playout.palyout_packet_size;
488
        if (pkts * playout.palyout_packet_size != buflen) {
489
                warn("Damaged chunk! (%d Packets, %d Buffersize)", pkts, buflen);
490
                //return;
491
        }
492

    
493
        struct recievedchunk *rc = malloc(sizeof(struct recievedchunk));
494
        rc->buffer = buffer;
495
        rc->currentpkt = 0;
496
        rc->pkts = pkts;
497
        rc->chunkid = chunkcounter;
498

    
499
        int diff = chunkcounter-chunktoken;
500
        if (diff <= playout.cycle_buffer) { // still space available for the next chunk
501
                struct timeval t = { 0, 0 };
502
                event_base_once(eventbase, -1, EV_TIMEOUT, &periodic_buffer_playout, rc, &t);
503
        }
504
        else{ // we drop the chunks (buffer overflow)
505
                warn("Buffer overflow (chunk %d dropped)!", chunkcounter);
506
                chunkcounter--;
507
        }
508
}
509

    
510
// Transfer received packets to the given output UDP stream with periodic callbacks
511
void periodic_buffer_playout(int fd, short event, void *arg) {
512
        struct recievedchunk *rc = arg;
513
        if (rc->chunkid > chunktoken) { // waiting for token
514
                struct timeval t = { 0, 0 };
515
                double period = (1.0/(double)playout.playout_frequency)*1000000;// period time in microseconds
516
                t.tv_usec = (int)period;
517
                event_base_once(eventbase, -1, EV_TIMEOUT, &periodic_buffer_playout, rc, &t);
518
        }
519
        else if (rc->chunkid == chunktoken) { // playout the buffer
520
                if (rc->currentpkt != rc->pkts) {
521
                        struct timeval t = { 0, 0 };
522
                        double period = (1.0/(double)playout.playout_frequency)*1000000;// period time in microseconds
523
                        t.tv_usec = (int)period;
524
                        sendto(playout.udpSocket, rc->buffer + rc->currentpkt * playout.palyout_packet_size, playout.palyout_packet_size, 0,
525
                                        (struct sockaddr *)&(playout.dst), sizeof(struct sockaddr_in));
526
                        rc->currentpkt++;
527
                        event_base_once(eventbase, -1, EV_TIMEOUT, &periodic_buffer_playout, rc, &t);
528
                }
529
                else {
530
                        chunktoken++;
531
                        debug("Next chunkID in the buffer: %d.", chunktoken);
532
                }
533
        }
534
}
535

    
536
// Open connection and send replies to active peer on dummy-requests - DummyMode ONLY
537
void rx_data_cb(char *buffer, int buflen, MsgType mt, recv_params *rparam) {
538
        char str[SOCKETID_STRING_SIZE];
539

    
540
        mlSocketIDToString(rparam->remote_socketID, str, sizeof(str));
541
        //debug("Received message from %s with MsgType %d: %s", str, mt, buffer+1);
542
        info("Received message from %s with MsgType %d: %s", str, mt, buffer+1);
543

    
544
        if(buffer[0] == 'A') { //First character defines Request
545
                int c_id = mlConnectionExist(rparam->remote_socketID, false);
546
                if(c_id >= 0) {
547
                        if(mlGetConnectionStatus(c_id))
548
                                send_reply(c_id, rparam->remote_socketID);
549
                }
550
                else {
551
                        int passive_conn_id = -1;
552
                        socketID_handle sock = malloc(SOCKETID_SIZE);
553
                        memcpy(sock, rparam->remote_socketID, SOCKETID_SIZE);
554
                        publish_connection_req();
555
                        passive_conn_id = mlOpenConnection(rparam->remote_socketID, &open_con_cb_passive, sock, sParams);
556
                        debug("Passive connection initialized with connection ID: %d", passive_conn_id);
557
                }
558
        }
559
}
560

    
561
// Open connection for a reply (if connection not open yet) - DummyMode ONLY
562
void open_con_cb_passive(int c_id, void *arg) {
563
        char str[SOCKETID_STRING_SIZE];
564

    
565
        mlSocketIDToString(arg, str, sizeof(str));
566
        debug("Opened passive connection with %s", str);
567
        send_reply(c_id, arg);
568
}
569

    
570
// Reply sending - DummyMode ONLY
571
void send_reply(int c_id, socketID_handle sock) {
572
        char str[SOCKETID_STRING_SIZE];
573
        MsgType mt = 3;
574

    
575
        mlSocketIDToString(sock, str, sizeof(str));
576
        debug("Sending reply to %s msg_type: %d", str, mt);
577
        mlSendData(c_id, reply, strlen(reply) + 1, 3, NULL);
578
}
579

    
580
// Called if the connection opening fails
581
void conn_fail_cb(int connectionID, void *arg) {
582
        error("Connection could not be established!\n");
583
}
584

    
585
// Called after the ML finished initializing. Used to open the initial connection
586
void receive_local_socketID_cb(socketID_handle local_socketID, int status) {
587
        if(status ) {
588
                info("Still trying to do NAT traversal...");
589
                return;
590
        }
591

    
592
        info("NAT traversal completed");
593

    
594
        // Store the local SocketID and it's string counterpart
595
        peer->LocalSocketID = local_socketID;
596
        mlSocketIDToString(local_socketID, peer->LocalID, sizeof(peer->LocalID));
597

    
598
        if (status == 0) messaging_ready = 1;
599

    
600
        info("The local SocketId is: %s", peer->LocalID);
601

    
602
        // Publish our Id in the repository from data in the config file
603
        publish_peerid();
604

    
605
        // Get the initial peer list
606
        Constraint cons;
607
        cons.published_name = SOCKETID_PUBLISH_NAME;
608
        cons.strValue = NULL;
609
        cons.minValue = SOCKETID_PUBLISH_VALUE_MIN;
610
        cons.maxValue = SOCKETID_PUBLISH_VALUE_MAX;
611

    
612
        repGetPeers(peer->repository, get_peers_cb, NULL, peer->nblist_desired_size, &cons, 1, NULL, 0, peer->channel);
613

    
614
        info("Waiting for incoming requests...");
615
}
616

    
617
// Print the list of active peers in the repository
618
void get_peers_cb(HANDLE client, HANDLE id, void *cbarg, char **result, int n) {
619
        info("GetPeers done: %d Peers initially in the repository. List:\n%s", n, print_list(result, n, 1));
620
}
621

    
622
// Setup the localhost destination where chunks are transfered to
623
int init_dest_ul(const char *stream, int packet_size, int frequency, int read_size, int cycle_buffer) {
624
        char addr[128];
625
        int port;
626

    
627
        if (sscanf(stream, "udp://%127[^:]:%i", addr, &port) != 2) {
628
                fatal("Unable to parse destination specification %s, set to localhost.", stream);
629
        }
630

    
631
        playout.udpSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
632
        playout.dst.sin_family = AF_INET;
633
        playout.dst.sin_addr.s_addr = inet_addr(addr);
634
        playout.dst.sin_port = htons(port);
635
        playout.palyout_packet_size = packet_size;
636
        playout.playout_frequency = frequency;
637
        playout.read_packet_size = read_size;
638
        playout.cycle_buffer = cycle_buffer;
639
}
640

    
641
// Setup the localhost source where data is received from
642
int init_source_ul(const char *stream, double chunk_duration) {
643
        char addr[128];
644
        int port;
645

    
646
        if (sscanf(stream, "udp://%127[^:]:%i", addr, &port) != 2) {
647
                fatal("Unable to parse source specification %s, set to localhost.", stream);
648
        }
649

    
650
        struct sockaddr_in udpsrc;
651
        int returnStatus = 0;
652
        evutil_socket_t udpSocket;
653

    
654
        udpSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
655

    
656
        evutil_make_socket_nonblocking(udpSocket);
657

    
658
        udpsrc.sin_family = AF_INET;
659
        udpsrc.sin_addr.s_addr = inet_addr(addr);
660
        udpsrc.sin_port = htons(port);
661

    
662
        int status = bind(udpSocket,(struct sockaddr *)&udpsrc, sizeof(udpsrc));
663
        if (status) {
664
                fatal("Unable to bind socket to udp://%s:%d", addr, port);
665
        }
666

    
667
        struct source_stream *src = malloc(sizeof(struct source_stream));
668
        src->read_size = playout.read_packet_size;
669
        src->read_buffer = malloc(src->read_size);
670

    
671
        src->bev = (struct bufferevent *)bufferevent_socket_new((struct event_base *)eventbase, udpSocket, 0);
672
        bufferevent_enable(src->bev, EV_READ);
673
        src->chunk_duration = chunk_duration;
674
        src->chunkid = 0;
675

    
676
        napaSchedulePeriodic(NULL, 1.0/chunk_duration, read_stream, src);
677
}
678

    
679
// Get the list of requested connections' list from the repository
680
void check_conn_reqs() {
681
        info("Checking connection requests by others...");
682
        repGetMeasurements(peer->repository, check_conn_reqs_cb, NULL, peer->nblist_desired_size, NULL, NULL, peer->LocalID, "ConnReq", peer->channel);
683
}
684

    
685
// Open connection if someone is trying to make a connection to us
686
void check_conn_reqs_cb(HANDLE client, HANDLE id, void *cbarg, MeasurementRecord *result, int nResults) {
687
        info("%d connections waiting for reply...", nResults);
688
        int i;
689
        for (i = 0; i != nResults; i++) {
690
                static char requestee[SOCKETID_STRING_SIZE];
691
                strcpy(requestee,result[i].originator);
692
                socketID_handle requesteeID = malloc(SOCKETID_SIZE);
693

    
694
                mlStringToSocketID(requestee, requesteeID);
695
                mlOpenConnection(requesteeID, &open_conn_cb_reply, NULL, sParams);
696
        }
697
        free(result);
698
}
699

    
700
// Called once the connection has been established for a request
701
void open_conn_cb_reply(int connectionID, void *arg) {
702
        info("Opened connection (reply) for a request with ID %d.", connectionID);
703
}
704

    
705
// Read out the buffer from the stream source and send chunks to the selected remote peer
706
void read_stream(HANDLE h, void *arg) {
707
        struct source_stream *src = (struct source_stream *)arg;
708
        int ptr = 0;
709
        src->read_size = playout.read_packet_size;
710
        src->read_buffer = malloc(src->read_size);
711
        int numofpackets = 0;
712

    
713
        if (currentConnectionID > -1) {
714
                size_t rd = playout.read_packet_size;
715
                while (rd == playout.read_packet_size) {
716
                        numofpackets++;
717
                        rd = bufferevent_read(src->bev, src->read_buffer + ptr, src->read_size - ptr);
718
                        if (rd == (src->read_size - ptr)) { /* Need to grow buffer */
719
                                src->read_size += playout.read_packet_size;
720
                                src->read_buffer = realloc(src->read_buffer, src->read_size);
721
                                ptr += rd;
722
                        }
723
                }
724
                if (ptr+rd != 0) {
725
                        //debug("Socket info: New chunk(%d): %d bytes read (%d packets)", src->chunkid++, ptr+rd, numofpackets);
726
                        info("Socket info: New chunk(%d): %d bytes read (%d packets)", src->chunkid++, ptr+rd, numofpackets);
727
                        mlSendData(currentConnectionID, src->read_buffer, src->read_size, 3, NULL);
728
                }
729
                else debug("Socket info: No new chunk read from input stream.");
730
        }
731
        else debug("Socket info: No active connection yet, reading skipped.");
732
}
733

    
734
// main function
735
int main(int argc, char *argv[]) {
736
        if ((argc < 2) || (argc > 3)) {
737
                fprintf(stderr, "ERROR - Correct Usage: ./nvtest2 peer.cfg [-dummy]\n");
738
                exit(-1);
739
        }
740

    
741
        if (argc > 2) {
742
                fprintf(stdout, "INFO - DummyMode active - no real traffic will be generated.\n");
743
                dummymode = 1;
744
        }
745

    
746
        // Initialize the NAPA infrastructure
747
        peer = peer_init(argv[1]);
748

    
749
        // Start everything
750
        event_base_dispatch(eventbase);
751
        free(remsocketID);
752
}