Statistics
| Branch: | Revision:

streamers / measures-monl.c @ 2deba9d9

History | View | Annotate | Download (14.2 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#include <math.h>
7
#ifndef NAN        //NAN is missing in some old math.h versions
8
#define NAN            (0.0/0.0)
9
#endif
10
#ifndef INFINITY
11
#define INFINITY       (1.0/0.0)
12
#endif
13

    
14
#include <mon.h>
15
#include <ml.h>
16
#include <net_helper.h>
17
#include <grapes_msg_types.h>
18

    
19
#include "channel.h"
20
#include "dbg.h"
21
#include "measures.h"
22

    
23
#define PEER_PUBLISH_INTERVAL 10 //in seconds
24
#define P2P_PUBLISH_INTERVAL 60 //in seconds
25

    
26
extern const char *peername;
27

    
28
typedef struct nodeID {
29
        socketID_handle addr;
30
        int connID;        // connection associated to this node, -1 if myself
31
        int refcnt;
32
        //a quick and dirty static vector for measures TODO: make it dinamic
33
        MonHandler mhs[20];
34
        int n_mhs;
35
} nodeID;
36

    
37
static MonHandler chunk_dup = -1, chunk_playout = -1 , in_neigh_size = -1, out_neigh_size = -1,  chunk_receive = -1, chunk_send = -1, offer_accept_in = -1, offer_accept_out = -1, chunk_hops = -1, chunk_delay = -1, playout_delay = -1;
38
static MonHandler queue_delay = -1 , offers_in_flight = -1;
39

    
40
//static MonHandler rx_bytes_chunk_per_sec, tx_bytes_chunk_per_sec, rx_bytes_sig_per_sec, tx_bytes_sig_per_sec;
41
//static MonHandler rx_chunks, tx_chunks;
42

    
43
/*
44
 * Start one measure
45
*/
46
void start_measure(MonHandler mh, MonParameterValue rate, const char *pubname, enum stat_types st[], int length, SocketId dst, MsgType mt)
47
{
48
        if (rate) monSetParameter (mh, P_PUBLISHING_RATE, rate);
49
        if (length) monPublishStatisticalType(mh, pubname, channel_get_name(), st , length, NULL);
50
        monActivateMeasure(mh, dst, mt);
51
}
52

    
53
/*
54
 * Initialize and start one measure
55
 */
56
void add_measure(MonHandler *mhp, MeasurementId id, MeasurementCapabilities mc, MonParameterValue rate, const char *pubname, enum stat_types st[], int length, SocketId dst, MsgType mt)
57
{
58
        *mhp = monCreateMeasure(id, mc);
59
        start_measure(*mhp, rate, pubname, st, length, dst, mt);
60
}
61

    
62
/*
63
 * Register duplicate arrival
64
*/
65
void reg_chunk_duplicate()
66
{
67
        if (chunk_dup < 0) {
68
                enum stat_types st[] = {SUM, RATE};
69
                // number of chunks which have been received more then once
70
                add_measure(&chunk_dup, GENERIC, 0, PEER_PUBLISH_INTERVAL, "ChunkDuplicates", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[chunks]
71
                monNewSample(chunk_dup, 0);        //force publish even if there are no events
72
        }
73
        monNewSample(chunk_dup, 1);
74
}
75

    
76
/*
77
 * Register playout/loss of a chunk before playout
78
*/
79
void reg_chunk_playout(int id, bool b, uint64_t timestamp)
80
{
81
        static MonHandler chunk_loss_burst_size;
82
        static int last_arrived_chunk = -1;
83

    
84
        struct timeval tnow;
85
        if (chunk_playout < 0 && b) {        //don't count losses before the first arrived chunk
86
                enum stat_types st[] = {WIN_AVG, AVG, SUM, RATE};
87
                //number of chunks played
88
                add_measure(&chunk_playout, GENERIC, 0, PEER_PUBLISH_INTERVAL, "ChunksPlayed", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[chunks]
89
        }
90
        monNewSample(chunk_playout, b);
91

    
92
        if (playout_delay < 0) {
93
                enum stat_types st[] = {WIN_AVG, WIN_VAR};
94
                //delay after reorder buffer, however chunkstream module does not use reorder buffer
95
                add_measure(&playout_delay, GENERIC, 0, PEER_PUBLISH_INTERVAL, "ReorderDelay", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[seconds]
96
        }
97
        if (b) {        //count delay only if chunk has arrived
98
                gettimeofday(&tnow, NULL);
99
                monNewSample(playout_delay, ((int64_t)(tnow.tv_usec + tnow.tv_sec * 1000000ULL) - (int64_t)timestamp) / 1000000.0);
100
        }
101

    
102
        //if (!chunk_loss_burst_size) {
103
        //        enum stat_types st[] = {WIN_AVG, WIN_VAR};
104
        //        // number of consecutive lost chunks
105
        //        add_measure(&chunk_loss_burst_size, GENERIC, 0, PEER_PUBLISH_INTERVAL, "ChunkLossBurstSize", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[chunks]
106
        //}
107
        if (b) {
108
                if (last_arrived_chunk >= 0) {
109
                        int burst_size = id - 1 - last_arrived_chunk;
110
                        if (burst_size) monNewSample(chunk_loss_burst_size, burst_size);
111
                }
112
                last_arrived_chunk = id;
113
        }
114
}
115

    
116
/*
117
 * Register actual neghbourhood size
118
*/
119
void reg_incoming_neigh_size(int s)
120
{
121
        if (in_neigh_size < 0) {
122
                enum stat_types st[] = {LAST, WIN_AVG};
123
                // number of peers in the neighboorhood
124
                add_measure(&in_neigh_size, GENERIC, 0, PEER_PUBLISH_INTERVAL, "InNeighSize", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
125
        }
126
        monNewSample(in_neigh_size, s);
127
}
128

    
129
/*
130
 * Register actual neghbourhood size
131
*/
132
void reg_outgoing_neigh_size(int s)
133
{
134
        if (out_neigh_size < 0) {
135
                enum stat_types st[] = {LAST, WIN_AVG};
136
                // number of peers in the neighboorhood
137
                add_measure(&out_neigh_size, GENERIC, 0, PEER_PUBLISH_INTERVAL, "OutNeighSize", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
138
        }
139
        monNewSample(out_neigh_size, s);
140
}
141

    
142
/*
143
 * Register chunk receive event
144
*/
145
void reg_chunk_receive(int id, uint64_t timestamp, int hopcount, bool old, bool dup)
146
{
147
        struct timeval tnow;
148

    
149
        if (chunk_receive < 0) {
150
                enum stat_types st[] = {RATE};
151
                // total number of received chunks per second
152
                add_measure(&chunk_receive, GENERIC, 0, PEER_PUBLISH_INTERVAL, "TotalRxChunk", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[chunks/s]
153
                monNewSample(chunk_receive, 0);        //force publish even if there are no events
154
        }
155
        monNewSample(chunk_receive, 1);
156

    
157
        if (chunk_hops < 0) {
158
                enum stat_types st[] = {WIN_AVG};
159
                // number of hops from source on the p2p network
160
                add_measure(&chunk_hops, GENERIC, 0, PEER_PUBLISH_INTERVAL, "OverlayHops", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
161
        }
162
        monNewSample(chunk_hops, hopcount);
163

    
164
        if (chunk_delay < 0) {
165
                enum stat_types st[] = {WIN_AVG, WIN_VAR};
166
                // time elapsed since the source emitted the chunk
167
                add_measure(&chunk_delay, GENERIC, 0, PEER_PUBLISH_INTERVAL, "ReceiveDelay", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[seconds]
168
        }
169
        gettimeofday(&tnow, NULL);
170
        monNewSample(chunk_delay, ((int64_t)(tnow.tv_usec + tnow.tv_sec * 1000000ULL) - (int64_t)timestamp) / 1000000.0);
171
}
172

    
173
/*
174
 * Register chunk send event
175
*/
176
void reg_chunk_send(int id)
177
{
178
        if (chunk_send < 0) {
179
                enum stat_types st[] = {RATE};
180
                add_measure(&chunk_send, GENERIC, 0, PEER_PUBLISH_INTERVAL, "TotalTxChunk", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[chunks/s]
181
                monNewSample(chunk_send, 0);        //force publish even if there are no events
182
        }
183
        monNewSample(chunk_send, 1);
184
}
185

    
186
/*
187
 * Register chunk accept event
188
*/
189
void reg_offer_accept_in(bool b)
190
{
191
        if (offer_accept_in < 0) {
192
                enum stat_types st[] = {WIN_AVG};
193
                // ratio between number of offers and number of accepts
194
                add_measure(&offer_accept_in, GENERIC, 0, PEER_PUBLISH_INTERVAL, "OfferAcceptIn", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[no unit -> ratio]
195
        }
196
        monNewSample(offer_accept_in, b);
197
}
198

    
199
/*
200
 * Register chunk accept event
201
*/
202
void reg_offer_accept_out(bool b)
203
{
204
        if (offer_accept_out < 0) {
205
                enum stat_types st[] = {WIN_AVG};
206
                // ratio between number of offers and number of accepts
207
                add_measure(&offer_accept_out, GENERIC, 0, PEER_PUBLISH_INTERVAL, "OfferAcceptOut", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[no unit -> ratio]
208
        }
209
        monNewSample(offer_accept_out, b);
210
}
211

    
212

    
213
/*
214
 * Register the number of offers in flight at each offer sent event
215
*/
216
void reg_offers_in_flight(int running_offer_threads)
217
{
218
        if (offers_in_flight < 0) {
219
                enum stat_types st[] =  {AVG, WIN_AVG, LAST};
220
                add_measure(&offers_in_flight, GENERIC, 0, PEER_PUBLISH_INTERVAL, "OffersInFlight", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
221
                monNewSample(offers_in_flight, 0);        //force publish even if there are no events
222
        }
223
        else {
224
                monNewSample(offers_in_flight, running_offer_threads);
225
        }
226
}
227

    
228
/*
229
 * Register queue delay at each ack received event
230
*/
231
void reg_queue_delay(double last_queue_delay)
232
{
233
        if (queue_delay < 0) {
234
                enum stat_types st[] =  {AVG, WIN_AVG, LAST};
235
                add_measure(&queue_delay, GENERIC, 0, PEER_PUBLISH_INTERVAL, "QueueDelay", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
236
                monNewSample(queue_delay, 0);        //force publish even if there are no events
237
        }
238
        monNewSample(queue_delay, last_queue_delay);
239
}
240

    
241
/*
242
 * Initialize peer level measurements
243
*/
244
void init_measures()
245
{
246
        if (peername) monSetPeerName(peername);
247
}
248

    
249
/*
250
 * End peer level measurements
251
*/
252
void end_measures()
253
{
254
}
255

    
256
/*
257
 * Initialize p2p measurements towards a peer
258
*/
259
void add_measures(struct nodeID *id)
260
{
261
        // Add measures
262
        int j = 0;
263
        enum stat_types stwinavgwinvar[] = {WIN_AVG, WIN_VAR};
264
        enum stat_types stwinavg[] = {WIN_AVG};
265
        enum stat_types stwinavgrate[] = {WIN_AVG, RATE};
266
//        enum stat_types stsum[] = {SUM};
267
        enum stat_types stsumwinsumrate[] = {SUM, PERIOD_SUM, WIN_SUM, RATE};
268

    
269
        dprintf("adding measures to %s\n",node_addr(id));
270

    
271
        /* HopCount */
272
        // number of hops at IP level
273
       id->mhs[j] = monCreateMeasure(HOPCOUNT, PACKET | IN_BAND);
274
       start_measure(id->mhs[j++], P2P_PUBLISH_INTERVAL, "HopCount", stwinavg, sizeof(stwinavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_SIGNALLING);        //[IP hops]
275

    
276
        /* Round Trip Time */
277
       id->mhs[j] = monCreateMeasure(RTT, PACKET | IN_BAND);
278
       start_measure(id->mhs[j++], P2P_PUBLISH_INTERVAL, "RoundTripDelay", stwinavgwinvar, sizeof(stwinavgwinvar)/sizeof(enum stat_types), id->addr, MSG_TYPE_SIGNALLING);        //[seconds]
279

    
280
        /* Loss */
281
       id->mhs[j] = monCreateMeasure(SEQWIN, PACKET | IN_BAND);
282
       start_measure(id->mhs[j++], 0, NULL, NULL, 0, id->addr, MSG_TYPE_CHUNK);
283
       id->mhs[j] = monCreateMeasure(LOSS, PACKET | IN_BAND | REMOTE_RESULTS);
284
       start_measure(id->mhs[j++], P2P_PUBLISH_INTERVAL, "LossRate", stwinavg, sizeof(stwinavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //LossRate_avg [probability 0..1] LossRate_rate [lost_pkts/sec]
285

    
286
       /* RX,TX volume in bytes (only chunks) */
287
       id->mhs[j] = monCreateMeasure(RX_BYTE, PACKET | IN_BAND);
288
       start_measure(id->mhs[j++], P2P_PUBLISH_INTERVAL, "RxBytesChunk", stsumwinsumrate, sizeof(stsumwinsumrate)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes]
289
       id->mhs[j] = monCreateMeasure(TX_BYTE, PACKET | IN_BAND);
290
       start_measure(id->mhs[j++], P2P_PUBLISH_INTERVAL, "TxBytesChunk", stsumwinsumrate, sizeof(stsumwinsumrate)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes]
291

    
292
       /* RX,TX volume in bytes (only signaling) */
293
       id->mhs[j] = monCreateMeasure(RX_BYTE, PACKET | IN_BAND);
294
       start_measure(id->mhs[j++], P2P_PUBLISH_INTERVAL, "RxBytesSig", stsumwinsumrate, sizeof(stsumwinsumrate)/sizeof(enum stat_types), id->addr, MSG_TYPE_SIGNALLING);        //[bytes]
295
       id->mhs[j] = monCreateMeasure(TX_BYTE, PACKET | IN_BAND);
296
       start_measure(id->mhs[j++], P2P_PUBLISH_INTERVAL, "TxBytesSig", stsumwinsumrate, sizeof(stsumwinsumrate)/sizeof(enum stat_types), id->addr, MSG_TYPE_SIGNALLING);        //[bytes]
297

    
298
        // Chunks
299
       id->mhs[j] = monCreateMeasure(RX_PACKET, DATA | IN_BAND);
300
       start_measure(id->mhs[j++], P2P_PUBLISH_INTERVAL, "RxChunks", stsumwinsumrate, sizeof(stsumwinsumrate)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //RxChunks_sum [chunks] RxChunks_rate [chunks/sec]
301
       id->mhs[j] = monCreateMeasure(TX_PACKET, DATA | IN_BAND);
302
       start_measure(id->mhs[j++], P2P_PUBLISH_INTERVAL, "TxChunks", stsumwinsumrate, sizeof(stsumwinsumrate)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //TxChunks_sum [chunks] TxChunks_rate [chunks/sec]
303

    
304
/*
305
//        // Capacity
306
        id->mhs[j] = monCreateMeasure(CLOCKDRIFT, PACKET | IN_BAND);
307
        monSetParameter (id->mhs[j], P_CLOCKDRIFT_ALGORITHM, 1);
308
        start_measure(id->mhs[j++], 0, NULL, NULL, 0, id->addr, MSG_TYPE_CHUNK);
309
        id->mhs[j] = monCreateMeasure(CORRECTED_DELAY, PACKET | IN_BAND);
310
        start_measure(id->mhs[j++], 0, NULL, NULL, 0, id->addr, MSG_TYPE_CHUNK);
311
        id->mhs[j] = monCreateMeasure(CAPACITY_CAPPROBE, PACKET | IN_BAND);
312
        monSetParameter (id->mhs[j], P_CAPPROBE_DELAY_TH, -1);
313
//        monSetParameter (id->mhs[j], P_CAPPROBE_PKT_TH, 5);
314
        start_measure(id->mhs[j++], P2P_PUBLISH_INTERVAL, "Capacity", stwinavg, sizeof(stwinavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes/s]
315

316
        //Available capacity
317
       id->mhs[j] = monCreateMeasure(BULK_TRANSFER, PACKET | DATA | IN_BAND);
318
       start_measure(id->mhs[j++], P2P_PUBLISH_INTERVAL, "BulkTransfer", stwinavg, sizeof(stwinavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK); //Bulktransfer [bit/s]
319
        id->mhs[j] = monCreateMeasure(AVAILABLE_BW_FORECASTER, PACKET | IN_BAND);
320
        start_measure(id->mhs[j++], P2P_PUBLISH_INTERVAL, "AvailableBW", stwinavg, sizeof(stwinavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes/s]
321
*/
322

    
323
        // for static must not be more then 10 or whatever size is in net_helper-ml.c
324
        id->n_mhs = j;
325
}
326

    
327
/*
328
 * Delete p2p measurements towards a peer
329
*/
330
void delete_measures(struct nodeID *id)
331
{
332
        dprintf("deleting measures from %s\n",node_addr(id));
333
        while(id->n_mhs) {
334
                monDestroyMeasure(id->mhs[--(id->n_mhs)]);
335
        }
336
}
337

    
338
/*
339
 * Helper to retrieve a measure
340
*/
341
double get_measure(struct nodeID *id, int j, enum stat_types st)
342
{
343
        return (id->n_mhs > j) ? monRetrieveResult(id->mhs[j], st) : NAN;
344
}
345

    
346
/*
347
 * Hopcount to a given peer
348
*/
349
int get_hopcount(struct nodeID *id){
350
        double r = get_measure(id, 0, LAST);
351
        return isnan(r) ? -1 : (int) r;
352
}
353

    
354
/*
355
 * RTT to a given peer in seconds
356
*/
357
double get_rtt(struct nodeID *id){
358
        return get_measure(id, 1, WIN_AVG);
359
}
360

    
361
/*
362
 * average RTT to a set of peers in seconds
363
*/
364
double get_average_rtt(struct nodeID **ids, int len){
365
        int i;
366
        int n = 0;
367
        double sum = 0;
368

    
369
        for (i = 0; i < len; i++) {
370
                double l = get_rtt(ids[i]);
371
                if (!isnan(l)) {
372
                        sum += l;
373
                        n++;
374
                }
375
        }
376
        return (n > 0) ? sum / n : NAN;
377
}
378

    
379
/*
380
 * loss ratio from a given peer as 0..1
381
*/
382
double get_lossrate(struct nodeID *id){
383
        return get_measure(id, 3, WIN_AVG);
384
}
385

    
386
/*
387
 * loss ratio from a given peer as 0..1
388
*/
389
double get_transmitter_lossrate(struct nodeID *id){
390
        return (monRetrieveResultById(id->addr, MSG_TYPE_CHUNK, PACKET | IN_BAND | REMOTE, LOSS, WIN_AVG));
391
}
392

    
393
/*
394
 * average loss ratio from a set of peers as 0..1
395
*/
396
double get_average_lossrate(struct nodeID **ids, int len){
397
        int i;
398
        int n = 0;
399
        double sum = 0;
400

    
401
        for (i = 0; i < len; i++) {
402
                double l = get_lossrate(ids[i]);
403
                if (!isnan(l)) {
404
                        sum += l;
405
                        n++;
406
                }
407
        }
408
        return (n > 0) ? sum / n : NAN;
409
}
410

    
411
double get_receive_delay(void) {
412
        return chunk_delay >= 0 ? monRetrieveResult(chunk_delay, WIN_AVG) : NAN;
413
}
414

    
415
double get_chunk_playout(void) {
416
        return chunk_playout>= 0 ? monRetrieveResult(chunk_playout, WIN_AVG) : NAN;
417
}
418

    
419
double get_rx_bytes_chunks(struct nodeID *id) {
420
        return get_measure(id, 4, PERIOD_SUM);
421
}
422

    
423