Statistics
| Branch: | Revision:

streamers / measures-monl.c @ c24f3970

History | View | Annotate | Download (13.3 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#include <math.h>
7
#ifndef NAN        //NAN is missing in some old math.h versions
8
#define NAN            (0.0/0.0)
9
#endif
10
#ifndef INFINITY
11
#define INFINITY       (1.0/0.0)
12
#endif
13

    
14
#include <mon.h>
15
#include <ml.h>
16
#include <net_helper.h>
17
#include <grapes_msg_types.h>
18

    
19
#include "channel.h"
20
#include "dbg.h"
21
#include "measures.h"
22

    
23
#define PEER_PUBLISH_INTERVAL 10 //in seconds
24
#define P2P_PUBLISH_INTERVAL 60 //in seconds
25

    
26
extern const char *peername;
27

    
28
typedef struct nodeID {
29
        socketID_handle addr;
30
        int connID;        // connection associated to this node, -1 if myself
31
        int refcnt;
32
        //a quick and dirty static vector for measures TODO: make it dinamic
33
        MonHandler mhs[20];
34
        int n_mhs;
35
} nodeID;
36

    
37
static MonHandler chunk_dup = -1, chunk_playout = -1 , neigh_size = -1, chunk_receive = -1, chunk_send = -1, offer_accept_in = -1, offer_accept_out = -1, chunk_hops = -1, chunk_delay = -1, playout_delay = -1;
38
static MonHandler queue_delay = -1 , offers_in_flight = -1;
39

    
40
//static MonHandler rx_bytes_chunk_per_sec, tx_bytes_chunk_per_sec, rx_bytes_sig_per_sec, tx_bytes_sig_per_sec;
41
//static MonHandler rx_chunks, tx_chunks;
42

    
43
/*
44
 * Start one measure
45
*/
46
void start_measure(MonHandler mh, MonParameterValue rate, const char *pubname, enum stat_types st[], int length, SocketId dst, MsgType mt)
47
{
48
        if (rate) monSetParameter (mh, P_PUBLISHING_RATE, rate);
49
        if (length) monPublishStatisticalType(mh, pubname, channel_get_name(), st , length, NULL);
50
        monActivateMeasure(mh, dst, mt);
51
}
52

    
53
/*
54
 * Initialize and start one measure
55
 */
56
void add_measure(MonHandler *mhp, MeasurementId id, MeasurementCapabilities mc, MonParameterValue rate, const char *pubname, enum stat_types st[], int length, SocketId dst, MsgType mt)
57
{
58
        *mhp = monCreateMeasure(id, mc);
59
        start_measure(*mhp, rate, pubname, st, length, dst, mt);
60
}
61

    
62
/*
63
 * Register duplicate arrival
64
*/
65
void reg_chunk_duplicate()
66
{
67
        if (chunk_dup < 0) {
68
                enum stat_types st[] = {SUM, RATE};
69
                // number of chunks which have been received more then once
70
                add_measure(&chunk_dup, GENERIC, 0, PEER_PUBLISH_INTERVAL, "ChunkDuplicates", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[chunks]
71
                monNewSample(chunk_dup, 0);        //force publish even if there are no events
72
        }
73
        monNewSample(chunk_dup, 1);
74
}
75

    
76
/*
77
 * Register playout/loss of a chunk before playout
78
*/
79
void reg_chunk_playout(int id, bool b, uint64_t timestamp)
80
{
81
        static MonHandler chunk_loss_burst_size;
82
        static int last_arrived_chunk = -1;
83

    
84
        struct timeval tnow;
85
        if (chunk_playout < 0 && b) {        //don't count losses before the first arrived chunk
86
                enum stat_types st[] = {WIN_AVG, AVG, SUM, RATE};
87
                //number of chunks played
88
                add_measure(&chunk_playout, GENERIC, 0, PEER_PUBLISH_INTERVAL, "ChunksPlayed", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[chunks]
89
        }
90
        monNewSample(chunk_playout, b);
91

    
92
        if (playout_delay < 0) {
93
                enum stat_types st[] = {WIN_AVG, WIN_VAR};
94
                //delay after reorder buffer, however chunkstream module does not use reorder buffer
95
                add_measure(&playout_delay, GENERIC, 0, PEER_PUBLISH_INTERVAL, "ReorderDelay", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[seconds]
96
        }
97
        if (b) {        //count delay only if chunk has arrived
98
                gettimeofday(&tnow, NULL);
99
                monNewSample(playout_delay, ((int64_t)(tnow.tv_usec + tnow.tv_sec * 1000000ULL) - (int64_t)timestamp) / 1000000.0);
100
        }
101

    
102
        //if (!chunk_loss_burst_size) {
103
        //        enum stat_types st[] = {WIN_AVG, WIN_VAR};
104
        //        // number of consecutive lost chunks
105
        //        add_measure(&chunk_loss_burst_size, GENERIC, 0, PEER_PUBLISH_INTERVAL, "ChunkLossBurstSize", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[chunks]
106
        //}
107
        if (b) {
108
                if (last_arrived_chunk >= 0) {
109
                        int burst_size = id - 1 - last_arrived_chunk;
110
                        if (burst_size) monNewSample(chunk_loss_burst_size, burst_size);
111
                }
112
                last_arrived_chunk = id;
113
        }
114
}
115

    
116
/*
117
 * Register actual neghbourhood size
118
*/
119
void reg_neigh_size(int s)
120
{
121
        if (neigh_size < 0) {
122
                enum stat_types st[] = {LAST, WIN_AVG};
123
                // number of peers in the neighboorhood
124
                add_measure(&neigh_size, GENERIC, 0, PEER_PUBLISH_INTERVAL, "NeighSize", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
125
        }
126
        monNewSample(neigh_size, s);
127
}
128

    
129
/*
130
 * Register chunk receive event
131
*/
132
void reg_chunk_receive(int id, uint64_t timestamp, int hopcount, bool old, bool dup)
133
{
134
        struct timeval tnow;
135

    
136
        if (chunk_receive < 0) {
137
                enum stat_types st[] = {RATE};
138
                // total number of received chunks per second
139
                add_measure(&chunk_receive, GENERIC, 0, PEER_PUBLISH_INTERVAL, "TotalRxChunk", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[chunks/s]
140
                monNewSample(chunk_receive, 0);        //force publish even if there are no events
141
        }
142
        monNewSample(chunk_receive, 1);
143

    
144
        if (chunk_hops < 0) {
145
                enum stat_types st[] = {WIN_AVG};
146
                // number of hops from source on the p2p network
147
                add_measure(&chunk_hops, GENERIC, 0, PEER_PUBLISH_INTERVAL, "OverlayHops", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
148
        }
149
        monNewSample(chunk_hops, hopcount);
150

    
151
        if (chunk_delay < 0) {
152
                enum stat_types st[] = {WIN_AVG, WIN_VAR};
153
                // time elapsed since the source emitted the chunk
154
                add_measure(&chunk_delay, GENERIC, 0, PEER_PUBLISH_INTERVAL, "ReceiveDelay", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[seconds]
155
        }
156
        gettimeofday(&tnow, NULL);
157
        monNewSample(chunk_delay, ((int64_t)(tnow.tv_usec + tnow.tv_sec * 1000000ULL) - (int64_t)timestamp) / 1000000.0);
158
}
159

    
160
/*
161
 * Register chunk send event
162
*/
163
void reg_chunk_send(int id)
164
{
165
        if (chunk_send < 0) {
166
                enum stat_types st[] = {RATE};
167
                add_measure(&chunk_send, GENERIC, 0, PEER_PUBLISH_INTERVAL, "TotalTxChunk", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[chunks/s]
168
                monNewSample(chunk_send, 0);        //force publish even if there are no events
169
        }
170
        monNewSample(chunk_send, 1);
171
}
172

    
173
/*
174
 * Register chunk accept event
175
*/
176
void reg_offer_accept_in(bool b)
177
{
178
        if (offer_accept_in < 0) {
179
                enum stat_types st[] = {WIN_AVG};
180
                // ratio between number of offers and number of accepts
181
                add_measure(&offer_accept_in, GENERIC, 0, PEER_PUBLISH_INTERVAL, "OfferAcceptIn", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[no unit -> ratio]
182
        }
183
        monNewSample(offer_accept_in, b);
184
}
185

    
186
/*
187
 * Register chunk accept event
188
*/
189
void reg_offer_accept_out(bool b)
190
{
191
        if (offer_accept_out < 0) {
192
                enum stat_types st[] = {WIN_AVG};
193
                // ratio between number of offers and number of accepts
194
                add_measure(&offer_accept_out, GENERIC, 0, PEER_PUBLISH_INTERVAL, "OfferAcceptOut", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[no unit -> ratio]
195
        }
196
        monNewSample(offer_accept_out, b);
197
}
198

    
199

    
200
/*
201
 * Register the number of offers in flight at each offer sent event
202
*/
203
void reg_offers_in_flight(int running_offer_threads)
204
{
205
        if (offers_in_flight < 0) {
206
                enum stat_types st[] =  {AVG, WIN_AVG, LAST};
207
                add_measure(&offers_in_flight, GENERIC, 0, PEER_PUBLISH_INTERVAL, "OffersInFlight", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
208
                monNewSample(offers_in_flight, 0);        //force publish even if there are no events
209
        }
210
        else {
211
                monNewSample(offers_in_flight, running_offer_threads);
212
        }
213
}
214

    
215
/*
216
 * Register queue delay at each ack received event
217
*/
218
void reg_queue_delay(double last_queue_delay)
219
{
220
        if (queue_delay < 0) {
221
                enum stat_types st[] =  {AVG, WIN_AVG, LAST};
222
                add_measure(&queue_delay, GENERIC, 0, PEER_PUBLISH_INTERVAL, "QueueDelay", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
223
                monNewSample(queue_delay, 0);        //force publish even if there are no events
224
        }
225
        monNewSample(queue_delay, last_queue_delay);
226
}
227

    
228
/*
229
 * Initialize peer level measurements
230
*/
231
void init_measures()
232
{
233
        if (peername) monSetPeerName(peername);
234
}
235

    
236
/*
237
 * End peer level measurements
238
*/
239
void end_measures()
240
{
241
}
242

    
243
/*
244
 * Initialize p2p measurements towards a peer
245
*/
246
void add_measures(struct nodeID *id)
247
{
248
        // Add measures
249
        int j = 0;
250
        enum stat_types stwinavgwinvar[] = {WIN_AVG, WIN_VAR};
251
        enum stat_types stwinavg[] = {WIN_AVG};
252
        enum stat_types stwinavgrate[] = {WIN_AVG, RATE};
253
//        enum stat_types stsum[] = {SUM};
254
        enum stat_types stsumwinsumrate[] = {SUM, PERIOD_SUM, WIN_SUM, RATE};
255

    
256
        dprintf("adding measures to %s\n",node_addr(id));
257

    
258
        /* HopCount */
259
        // number of hops at IP level
260
       id->mhs[j] = monCreateMeasure(HOPCOUNT, PACKET | IN_BAND);
261
       start_measure(id->mhs[j++], P2P_PUBLISH_INTERVAL, "HopCount", stwinavg, sizeof(stwinavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_SIGNALLING);        //[IP hops]
262

    
263
        /* Round Trip Time */
264
       id->mhs[j] = monCreateMeasure(RTT, PACKET | IN_BAND);
265
       start_measure(id->mhs[j++], P2P_PUBLISH_INTERVAL, "RoundTripDelay", stwinavgwinvar, sizeof(stwinavgwinvar)/sizeof(enum stat_types), id->addr, MSG_TYPE_SIGNALLING);        //[seconds]
266

    
267
        /* Loss */
268
       id->mhs[j] = monCreateMeasure(SEQWIN, PACKET | IN_BAND);
269
       start_measure(id->mhs[j++], 0, NULL, NULL, 0, id->addr, MSG_TYPE_CHUNK);
270
       id->mhs[j] = monCreateMeasure(LOSS, PACKET | IN_BAND);
271
       start_measure(id->mhs[j++], P2P_PUBLISH_INTERVAL, "LossRate", stwinavg, sizeof(stwinavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //LossRate_avg [probability 0..1] LossRate_rate [lost_pkts/sec]
272

    
273
       /* RX,TX volume in bytes (only chunks) */
274
       id->mhs[j] = monCreateMeasure(RX_BYTE, PACKET | IN_BAND);
275
       start_measure(id->mhs[j++], P2P_PUBLISH_INTERVAL, "RxBytesChunk", stsumwinsumrate, sizeof(stsumwinsumrate)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes]
276
       id->mhs[j] = monCreateMeasure(TX_BYTE, PACKET | IN_BAND);
277
       start_measure(id->mhs[j++], P2P_PUBLISH_INTERVAL, "TxBytesChunk", stsumwinsumrate, sizeof(stsumwinsumrate)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes]
278

    
279
       /* RX,TX volume in bytes (only signaling) */
280
       id->mhs[j] = monCreateMeasure(RX_BYTE, PACKET | IN_BAND);
281
       start_measure(id->mhs[j++], P2P_PUBLISH_INTERVAL, "RxBytesSig", stsumwinsumrate, sizeof(stsumwinsumrate)/sizeof(enum stat_types), id->addr, MSG_TYPE_SIGNALLING);        //[bytes]
282
       id->mhs[j] = monCreateMeasure(TX_BYTE, PACKET | IN_BAND);
283
       start_measure(id->mhs[j++], P2P_PUBLISH_INTERVAL, "TxBytesSig", stsumwinsumrate, sizeof(stsumwinsumrate)/sizeof(enum stat_types), id->addr, MSG_TYPE_SIGNALLING);        //[bytes]
284

    
285
        // Chunks
286
       id->mhs[j] = monCreateMeasure(RX_PACKET, DATA | IN_BAND);
287
       start_measure(id->mhs[j++], P2P_PUBLISH_INTERVAL, "RxChunks", stsumwinsumrate, sizeof(stsumwinsumrate)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //RxChunks_sum [chunks] RxChunks_rate [chunks/sec]
288
       id->mhs[j] = monCreateMeasure(TX_PACKET, DATA | IN_BAND);
289
       start_measure(id->mhs[j++], P2P_PUBLISH_INTERVAL, "TxChunks", stsumwinsumrate, sizeof(stsumwinsumrate)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //TxChunks_sum [chunks] TxChunks_rate [chunks/sec]
290

    
291
/*
292
//        // Capacity
293
        id->mhs[j] = monCreateMeasure(CLOCKDRIFT, PACKET | IN_BAND);
294
        monSetParameter (id->mhs[j], P_CLOCKDRIFT_ALGORITHM, 1);
295
        start_measure(id->mhs[j++], 0, NULL, NULL, 0, id->addr, MSG_TYPE_CHUNK);
296
        id->mhs[j] = monCreateMeasure(CORRECTED_DELAY, PACKET | IN_BAND);
297
        start_measure(id->mhs[j++], 0, NULL, NULL, 0, id->addr, MSG_TYPE_CHUNK);
298
        id->mhs[j] = monCreateMeasure(CAPACITY_CAPPROBE, PACKET | IN_BAND);
299
        monSetParameter (id->mhs[j], P_CAPPROBE_DELAY_TH, -1);
300
//        monSetParameter (id->mhs[j], P_CAPPROBE_PKT_TH, 5);
301
        start_measure(id->mhs[j++], P2P_PUBLISH_INTERVAL, "Capacity", stwinavg, sizeof(stwinavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes/s]
302

303
        //Available capacity
304
       id->mhs[j] = monCreateMeasure(BULK_TRANSFER, PACKET | DATA | IN_BAND);
305
       start_measure(id->mhs[j++], P2P_PUBLISH_INTERVAL, "BulkTransfer", stwinavg, sizeof(stwinavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK); //Bulktransfer [bit/s]
306
        id->mhs[j] = monCreateMeasure(AVAILABLE_BW_FORECASTER, PACKET | IN_BAND);
307
        start_measure(id->mhs[j++], P2P_PUBLISH_INTERVAL, "AvailableBW", stwinavg, sizeof(stwinavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes/s]
308
*/
309

    
310
        // for static must not be more then 10 or whatever size is in net_helper-ml.c
311
        id->n_mhs = j;
312
}
313

    
314
/*
315
 * Delete p2p measurements towards a peer
316
*/
317
void delete_measures(struct nodeID *id)
318
{
319
        dprintf("deleting measures from %s\n",node_addr(id));
320
        while(id->n_mhs) {
321
                monDestroyMeasure(id->mhs[--(id->n_mhs)]);
322
        }
323
}
324

    
325
/*
326
 * Helper to retrieve a measure
327
*/
328
double get_measure(struct nodeID *id, int j, enum stat_types st)
329
{
330
        return (id->n_mhs > j) ? monRetrieveResult(id->mhs[j], st) : NAN;
331
}
332

    
333
/*
334
 * Hopcount to a given peer
335
*/
336
int get_hopcount(struct nodeID *id){
337
        double r = get_measure(id, 0, LAST);
338
        return isnan(r) ? -1 : (int) r;
339
}
340

    
341
/*
342
 * RTT to a given peer in seconds
343
*/
344
double get_rtt(struct nodeID *id){
345
        return get_measure(id, 1, WIN_AVG);
346
}
347

    
348
/*
349
 * average RTT to a set of peers in seconds
350
*/
351
double get_average_rtt(struct nodeID **ids, int len){
352
        int i;
353
        int n = 0;
354
        double sum = 0;
355

    
356
        for (i = 0; i < len; i++) {
357
                double l = get_rtt(ids[i]);
358
                if (!isnan(l)) {
359
                        sum += l;
360
                        n++;
361
                }
362
        }
363
        return (n > 0) ? sum / n : NAN;
364
}
365

    
366
/*
367
 * loss ratio from a given peer as 0..1
368
*/
369
double get_lossrate(struct nodeID *id){
370
        return get_measure(id, 3, WIN_AVG);
371
}
372

    
373
/*
374
 * average loss ratio from a set of peers as 0..1
375
*/
376
double get_average_lossrate(struct nodeID **ids, int len){
377
        int i;
378
        int n = 0;
379
        double sum = 0;
380

    
381
        for (i = 0; i < len; i++) {
382
                double l = get_lossrate(ids[i]);
383
                if (!isnan(l)) {
384
                        sum += l;
385
                        n++;
386
                }
387
        }
388
        return (n > 0) ? sum / n : NAN;
389
}
390

    
391
double get_receive_delay(void) {
392
        return chunk_delay >= 0 ? monRetrieveResult(chunk_delay, WIN_AVG) : NAN;
393
}