Statistics
| Branch: | Revision:

napa-baselibs / tests / MonTest / MonTest.c @ 507372bb

History | View | Annotate | Download (11 KB)

1
/***************************************************************************
2
 *   Copyright (C) 2009 by Robert Birke   *
3
 *   robert.birke@polito.it   *
4
 *                                                                         *
5
 *   This program is free software; you can redistribute it and/or modify  *
6
 *   it under the terms of the GNU General Public License as published by  *
7
 *   the Free Software Foundation; either version 2 of the License, or     *
8
 *   (at your option) any later version.                                   *
9
 *                                                                         *
10
 *   This program is distributed in the hope that it will be useful,       *
11
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
12
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
13
 *   GNU General Public License for more details.                          *
14
 *                                                                         *
15
 *   You should have received a copy of the GNU General Public License     *
16
 *   along with this program; if not, write to the                         *
17
 *   Free Software Foundation, Inc.,                                       *
18
 *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.             *
19
 ***************************************************************************/
20

    
21
#include <event2/event.h>
22
#include <unistd.h>
23
#include <string.h>
24
#include <math.h>
25

    
26
#include "ml.h"
27
#include "mon.h"
28
#include "napa_log.h"
29
#include "napa.h"
30
#include "repoclient.h"
31

    
32
int active = 0; // active or passive?
33

    
34
#define SOCKETID_PUBLISH_NAME "SocketId"
35
#define SOCKETID_PUBLISH_VALUE 0
36
char request[] = "APingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPingPing";
37

    
38
char reply[] = "BPong";
39
//Note first characters identifies Request form Replies and must be maintaned (it is not printed out)
40

    
41
HANDLE repoclient;
42

    
43
socketID_handle rem_peer = NULL; // the remote SocketId
44

    
45
/** Helper to print a string list */
46
const char *print_list(char **list, int n, bool should_free) {
47
        static char buffer[4096];
48
        int i;
49
        for (i = 0; i < n; i++) {
50
                if (i) strcat(buffer, "\n");
51
                strcat(buffer, list[i]);
52
                if (should_free) free(list[i]);
53
        }
54
        if (should_free) free(list);
55
        return buffer;
56
}
57

    
58
void get_peers_cb(HANDLE client, HANDLE id, void *cbarg, char **result, int n) {
59
        info("GetPeers done: %d Peers. List:\n%s", n, print_list(result, n, 1));
60
}
61

    
62
/* Event to send periodically traffic after the connection has been established */
63
void send_data_cb(int fd, short event,void *arg){
64
        int *con_id = arg;
65
        struct timeval t = {0,10000};
66

    
67
        mlSendData(*con_id, request, strlen(request) + 1, 3, NULL);
68

    
69
        //reschedule
70
        event_base_once(eventbase, -1, EV_TIMEOUT, &send_data_cb, arg, &t);
71
}
72

    
73
/* Called once the connection has been established  */
74
void open_conn_cb_active (int connectionID, void *arg){
75
        struct timeval t = {0,0};
76
        char str[SOCKETID_STRING_SIZE];
77
        
78
        int *con_id = malloc(sizeof(int));
79
        *con_id = connectionID;
80

    
81
        mlSocketIDToString(rem_peer,str, sizeof(str));
82
        info("Opened connection to %s.",str);
83
        info("Start sending requests");
84

    
85
        // peer 1 sends data to peer 2
86
        if(active) {
87
                int ret;
88
                MonHandler mh;
89
                enum stat_types st[] = {AVG};
90
                enum stat_types st2[] = {SUM};
91

    
92
//                 /* HopCount */
93
//                 mh = monCreateMeasure(HOPCOUNT, TXRXUNI | PACKET | IN_BAND);
94
//                 monPublishStatisticalType(mh, NULL, NULL, st, sizeof(st)/sizeof(enum stat_types), repoclient);
95
//                 ret = monActivateMeasure(mh, rem_peer,0);
96
// 
97
//                 /* RX Bytes */
98
//                 mh = monCreateMeasure(BYTE, RXONLY | PACKET | IN_BAND);
99
//                 monPublishStatisticalType(mh, NULL, NULL, st2, sizeof(st2)/sizeof(enum stat_types), repoclient);
100
//                 ret = monActivateMeasure(mh, rem_peer,0);
101
// 
102
//                 /* RX Pkts */
103
//                 mh = monCreateMeasure(COUNTER, RXONLY | PACKET | IN_BAND);
104
//                 monPublishStatisticalType(mh, NULL, NULL, st2, sizeof(st2)/sizeof(enum stat_types), repoclient);
105
//                 ret = monActivateMeasure(mh, NULL, 0);
106
// 
107
//                 /* Round Trip Time */
108
//                 mh = monCreateMeasure(RTT, TXRXBI | DATA | IN_BAND);
109
//                 monPublishStatisticalType(mh, NULL, st , sizeof(st)/sizeof(enum stat_types), repoclient);
110
//                 ret = monActivateMeasure(mh,rem_peer,3);
111
// 
112
//                 /* Clockdrift and capacity (Note: out of band measures: injects test traffic) */
113
                mh = monCreateMeasure(CLOCKDRIFT, TXRXUNI | PACKET | IN_BAND);
114
                monSetParameter (mh, P_CLOCKDRIFT_ALGORITHM, 1);
115
                ret = monActivateMeasure(mh,rem_peer, 3);
116
//
117

    
118
                mh = monCreateMeasure(CORRECTED_DELAY, TXRXUNI | PACKET | IN_BAND);
119
                ret = monActivateMeasure(mh,rem_peer, 3);
120

    
121
                mh = monCreateMeasure(CAPACITY_CAPPROBE, TXRXUNI | PACKET | IN_BAND);
122
                monSetParameter (mh, P_DEBUG_FILE, 1);
123
                monSetParameter (mh, P_CAPPROBE_DELAY_TH, -1);
124
//                 monSetParameter (mh, P_CAPPROBE_PKT_TH, 100);
125
//                 monSetParameter (mh, P_CAPPROBE_IPD_TH, 60);
126
//                 monPublishStatisticalType(mh, NULL, st , sizeof(st)/sizeof(enum stat_types), repoclient);
127
                 ret = monActivateMeasure(mh,rem_peer, 3);
128

    
129
                event_base_once(eventbase, -1, EV_TIMEOUT, &send_data_cb, con_id , &t);
130
        }
131
}
132

    
133
/* Called after the ML finished initialising. Used to open the initial connection*/
134
void receive_local_socketID_cb(socketID_handle local_socketID, int status){
135
        int res, conID;
136
        char str[SOCKETID_STRING_SIZE];
137
        
138
        if(status ) {
139
                info("Still trying to do NAT traversal");
140
                return;
141
        }
142

    
143
        info("Nat traversal completed");
144

    
145
        mlSocketIDToString(local_socketID,str, sizeof(str));
146
        info("My local SocketId is: %s",str);
147

    
148
        //publish our Id in the repository
149
        MeasurementRecord mr;
150
        mr.originator = mr.targetA = mr.targetB = str;
151
        mr.string_value = NULL;
152
        mr.channel = NULL;
153
        mr.published_name = SOCKETID_PUBLISH_NAME;
154
        mr.value = SOCKETID_PUBLISH_VALUE;
155
        gettimeofday(&(mr.timestamp), NULL);
156
        
157
        repPublish(repoclient, NULL, NULL, &mr);
158

    
159
        /* Get a peer list (not used as of now) */
160
        Constraint cons;
161
        cons.published_name = SOCKETID_PUBLISH_NAME;
162
        cons.strValue = NULL;
163
        cons.minValue = SOCKETID_PUBLISH_VALUE;
164
        cons.maxValue = SOCKETID_PUBLISH_VALUE;
165

    
166
        repGetPeers(repoclient, get_peers_cb, NULL, 10, &cons, 1, NULL, 0, NULL);
167

    
168
        send_params sParams;
169
        if(active) {
170
                char str[SOCKETID_STRING_SIZE];
171

    
172
                mlSocketIDToString(rem_peer,str, sizeof(str));
173
                info("We are active!");
174
                info("Open connection to %s ...", str);
175

    
176
                mlOpenConnection(rem_peer, &open_conn_cb_active, NULL, sParams);
177
        } else
178
                info("Waiting for incomig requests");
179
}
180

    
181
/* Called if the connection opening fails */
182
void conn_fail_cb(int connectionID, void *arg){
183
        error("Connection could not be established!\n");
184
}
185

    
186
void send_reply(int c_id, socketID_handle sock) {
187
        char str[SOCKETID_STRING_SIZE];
188
        MsgType mt = 3;
189

    
190
        mlSocketIDToString(sock,str, sizeof(str));
191
        info("Sending reply to %s msg_type: %d", str, mt);
192

    
193
        mlSendData(c_id, reply, strlen(reply) + 1, 3, NULL);
194
}
195

    
196
void open_con_cb_passive(int c_id, void *arg) {
197
        char str[SOCKETID_STRING_SIZE];
198

    
199
        mlSocketIDToString(arg, str, sizeof(str));
200
        info("Opened connection with %s", str);
201

    
202
        send_reply(c_id, arg);
203

    
204
        free(arg);
205
}
206

    
207
/* pasive replies to active peer */
208
void rx_data_cb(char *buffer,int buflen, MsgType mt, recv_params *rparam){
209
        char str[SOCKETID_STRING_SIZE];
210

    
211
        mlSocketIDToString(rparam->remote_socketID,str, sizeof(str));
212
        info("Received message from %s on MsgType %d: %s",str ,mt, buffer+1);
213

    
214
        if(buffer[0] == 'A') { //Request?
215
                int c_id = mlConnectionExist(rparam->remote_socketID, false);
216
                if(c_id >= 0) {
217
                        if(mlGetConnectionStatus(c_id))        
218
                                send_reply(c_id, rparam->remote_socketID);
219
                } else {
220
                        socketID_handle sock = malloc(SOCKETID_SIZE);
221
                        memcpy(sock, rparam->remote_socketID, SOCKETID_SIZE);
222
                        send_params sParams;
223
                        mlOpenConnection(rparam->remote_socketID, &open_con_cb_passive, sock, sParams);
224
                        info("Waiting to open connection");
225
                }
226
        }
227
}
228

    
229

    
230
int main(int argc, char *argv[]) {
231
        char *repository = "repository.napa-wine.eu:9832";
232
        char *stun_server = "stun.ekiga.net";
233
        char *bind_ip = NULL;
234
        int verbosity = 100;
235

    
236
        if((argc % 2 != 1 && argc > 9) || (argc == 2 && (!strcmp(argv[1], "-h") || !strcmp(argv[1], "--help")))) {
237
                printf("Usage:\n\t%s [-b <bindIp>] [-a <SocketId>] [-r <repository:port>] [-s <stunserver>]\n", argv[0]);
238
                exit(1);
239
        }
240

    
241
        int i;
242
        for (i = 1; i < argc; i += 2) {
243
                /* Are we active or passive? */
244
                if (!strcmp("-a", argv[i])) {
245
                        char str[SOCKETID_STRING_SIZE];
246
                        active = 1;
247
                        rem_peer = malloc(SOCKETID_SIZE);
248
                        mlStringToSocketID(argv[i+1], rem_peer);
249

    
250
                        mlSocketIDToString(rem_peer, str, sizeof(str));
251
                        info("Remote SocketId: %s", str);
252
                        printf("\n");
253
                        }
254
                else if (!strcmp("-r", argv[i])) {
255
                        repository = argv[i+1];
256
                }
257
                else if (!strcmp("-s", argv[i])) {
258
                        stun_server = argv[i+1];
259
                }
260
                else if (!strcmp("-b", argv[i])) {
261
                        bind_ip = argv[i+1];
262
                }
263
                else if (!strcmp("-v", argv[i])) {
264
                        verbosity = atoi(argv[i+1]);
265
                }
266
        }
267

    
268
        printf("Running conf:\nIP: %s\nSTUN: %s\nREPO: %s\nACTIVE: %s\nVERBOSITY: %d\n", bind_ip ? bind_ip:"auto", stun_server, repository, active ? "active" : "passive", verbosity);
269

    
270
        //Init napa: log facility and libevent
271
        napaInit(event_base_new());
272

    
273
        // Initialize logging
274
        napaInitLog(verbosity, NULL, NULL);
275

    
276
        //Init monitoring layer
277
        monInit(eventbase, NULL);
278

    
279
        //Init repoclient
280
        repInit("");
281
        repoclient = repOpen(repository,0);
282
        if (repoclient == NULL) fatal("Unable to initialize repoclient");
283

    
284
        //register callback
285
        mlRegisterRecvDataCb(&rx_data_cb,3);
286

    
287
        // Init messaging layer
288
        mlRegisterErrorConnectionCb(&conn_fail_cb);
289
        
290

    
291
        struct timeval timeout = {3,0};
292
        mlInit(true, timeout, 9000+active, bind_ip, 3478, stun_server,
293
                        &receive_local_socketID_cb, (void*)eventbase);
294
        //mlInit(true,timeout,9000+active,NULL,3478,NULL,&receive_local_socketID_cb,(void*)eventbase);
295

    
296

    
297
        //Start everything
298
        event_base_dispatch(eventbase);
299
        free(rem_peer);
300
}