Statistics
| Branch: | Revision:

streamers / measures-monl.c @ 74a5d4ae

History | View | Annotate | Download (11.2 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#include <math.h>
7
#ifndef NAN        //NAN is missing in some old math.h versions
8
#define NAN            (0.0/0.0)
9
#endif
10
#ifndef INFINITY
11
#define INFINITY       (1.0/0.0)
12
#endif
13

    
14
#include <mon.h>
15
#include <ml.h>
16
#include <net_helper.h>
17

    
18
#include "channel.h"
19
#include "dbg.h"
20
#include "measures.h"
21

    
22
typedef struct nodeID {
23
        socketID_handle addr;
24
        int connID;        // connection associated to this node, -1 if myself
25
        int refcnt;
26
        //a quick and dirty static vector for measures TODO: make it dinamic
27
        MonHandler mhs[20];
28
        int n_mhs;
29
} nodeID;
30

    
31
static MonHandler chunk_dup, chunk_playout, neigh_size, chunk_receive, chunk_send, offer_accept, chunk_hops, chunk_delay, playout_delay;
32
static MonHandler rx_bytes_chunk_per_sec, tx_bytes_chunk_per_sec, rx_bytes_sig_per_sec, tx_bytes_sig_per_sec;
33
static MonHandler rx_chunks, tx_chunks;
34

    
35
/*
36
 * Initialize one measure
37
*/
38
void add_measure(MonHandler *mh, MeasurementId id, MeasurementCapabilities mc, MonParameterValue rate, const char *pubname, enum stat_types st[], int length, SocketId dst, MsgType mt)
39
{
40
        *mh = monCreateMeasure(id, mc);
41
        if (rate) monSetParameter (*mh, P_PUBLISHING_RATE, rate);
42
        if (length) monPublishStatisticalType(*mh, pubname, channel_get_name(), st , length, NULL);
43
        monActivateMeasure(*mh, dst, mt);
44
}
45

    
46
/*
47
 * Register duplicate arrival
48
*/
49
void reg_chunk_duplicate()
50
{
51
        if (!chunk_dup) {
52
                enum stat_types st[] = {SUM, RATE};
53
                add_measure(&chunk_dup, GENERIC, 0, 120, "ChunkDuplicates", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[chunks]
54
                monNewSample(chunk_dup, 0);        //force publish even if there are no events
55
        }
56
        monNewSample(chunk_dup, 1);
57
}
58

    
59
/*
60
 * Register playout/loss of a chunk before playout
61
*/
62
void reg_chunk_playout(int id, bool b, uint64_t timestamp)
63
{
64
        static MonHandler chunk_loss_burst_size;
65
        static int last_arrived_chunk = -1;
66

    
67
        struct timeval tnow;
68

    
69
        if (!chunk_playout && b) {        //don't count losses before the first arrived chunk
70
                enum stat_types st[] = {AVG, SUM, RATE};
71
                add_measure(&chunk_playout, GENERIC, 0, 120, "ChunksPlayed", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[chunks]
72
        }
73
        monNewSample(chunk_playout, b);
74

    
75
        if (!playout_delay) {
76
                enum stat_types st[] = {WIN_AVG, WIN_VAR};
77
                add_measure(&playout_delay, GENERIC, 0, 120, "PlayoutDelay", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
78
        }
79
        if (b) {        //count delay only if chunk has arrived
80
                gettimeofday(&tnow, NULL);
81
                monNewSample(playout_delay, ((int64_t)(tnow.tv_usec + tnow.tv_sec * 1000000ULL) - (int64_t)timestamp) / 1000000.0);
82
        }
83

    
84
        if (!chunk_loss_burst_size) {
85
                enum stat_types st[] = {WIN_AVG, WIN_VAR};
86
                add_measure(&chunk_loss_burst_size, GENERIC, 0, 120, "ChunkLossBurstSize", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
87
        }
88
        if (b) {
89
                if (last_arrived_chunk >= 0) {
90
                        int burst_size = id - 1 - last_arrived_chunk;
91
                        if (burst_size) monNewSample(chunk_loss_burst_size, burst_size);
92
                }
93
                last_arrived_chunk = id;
94
        }
95
}
96

    
97
/*
98
 * Register actual neghbourhood size
99
*/
100
void reg_neigh_size(int s)
101
{
102
        if (!neigh_size) {
103
                enum stat_types st[] = {LAST};
104
                add_measure(&neigh_size, GENERIC, 0, 120, "NeighSize", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
105
        }
106
        monNewSample(neigh_size, s);
107
}
108

    
109
/*
110
 * Register chunk receive event
111
*/
112
void reg_chunk_receive(int id, uint64_t timestamp, int hopcount)
113
{
114
        struct timeval tnow;
115

    
116
        if (!chunk_receive) {
117
                enum stat_types st[] = {RATE};
118
                add_measure(&chunk_receive, GENERIC, 0, 120, "RxChunkAll", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
119
                monNewSample(chunk_receive, 0);        //force publish even if there are no events
120
        }
121
        monNewSample(chunk_receive, 1);
122

    
123
        if (!chunk_hops) {
124
                enum stat_types st[] = {WIN_AVG, WIN_VAR};
125
                add_measure(&chunk_hops, GENERIC, 0, 120, "Hops", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
126
        }
127
        monNewSample(chunk_hops, hopcount);
128

    
129
        if (!chunk_delay) {
130
                enum stat_types st[] = {WIN_AVG, WIN_VAR};
131
                add_measure(&chunk_delay, GENERIC, 0, 120, "ReceiveDelay", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
132
        }
133
        gettimeofday(&tnow, NULL);
134
        monNewSample(chunk_delay, ((int64_t)(tnow.tv_usec + tnow.tv_sec * 1000000ULL) - (int64_t)timestamp) / 1000000.0);
135
}
136

    
137
/*
138
 * Register chunk send event
139
*/
140
void reg_chunk_send(int id)
141
{
142
        if (!chunk_send) {
143
                enum stat_types st[] = {RATE};
144
                add_measure(&chunk_send, GENERIC, 0, 120, "TxChunkAll", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
145
                monNewSample(chunk_send, 0);        //force publish even if there are no events
146
        }
147
        monNewSample(chunk_send, 1);
148
}
149

    
150
/*
151
 * Register chunk accept evemt
152
*/
153
void reg_offer_accept(bool b)
154
{
155
        if (!offer_accept) {
156
                enum stat_types st[] = {AVG};
157
                add_measure(&offer_accept, GENERIC, 0, 120, "OfferAccept", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
158
        }
159
        monNewSample(offer_accept, b);
160
}
161

    
162
/*
163
 * Initialize peer level measurements
164
*/
165
void init_measures()
166
{
167
        enum stat_types stavg[] = {WIN_AVG};
168
        enum stat_types stsum[] = {SUM};
169

    
170
        // Traffic
171
       add_measure(&rx_bytes_chunk_per_sec, BULK_TRANSFER, RXONLY | PACKET | TIMER_BASED, 120, "RxBytesChunkPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), NULL, MSG_TYPE_CHUNK);        //[bytes/s]
172
       add_measure(&tx_bytes_chunk_per_sec, BULK_TRANSFER, TXONLY | PACKET | TIMER_BASED, 120, "TxBytesChunkPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), NULL, MSG_TYPE_CHUNK);        //[bytes/s]
173
       add_measure(&rx_bytes_sig_per_sec, BULK_TRANSFER, RXONLY | PACKET | TIMER_BASED, 120, "RxBytesSigPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), NULL, MSG_TYPE_SIGNALLING);        //[bytes/s]
174
       add_measure(&tx_bytes_sig_per_sec, BULK_TRANSFER, TXONLY | PACKET | TIMER_BASED, 120, "TxBytesSigPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), NULL, MSG_TYPE_SIGNALLING);        //[bytes/s]
175

    
176
        // Chunks
177
       // replaced by reg_chun_receive add_measure(&rx_chunks, COUNTER, RXONLY | DATA | IN_BAND, 120, "RxChunksAll", stsum, sizeof(stsum)/sizeof(enum stat_types), NULL, MSG_TYPE_CHUNK);        //[chunks]
178
       // replaced by reg_chun_send add_measure(&tx_chunks, COUNTER, TXONLY | DATA | IN_BAND, 120, "TxChunksAll", stsum, sizeof(stsum)/sizeof(enum stat_types), NULL, MSG_TYPE_CHUNK);        //[chunks]
179
}
180

    
181
/*
182
 * Initialize p2p measurements towards a peer
183
*/
184
void add_measures(struct nodeID *id)
185
{
186
        // Add measures
187
        int j = 0;
188
        enum stat_types stavg[] = {AVG};
189
        enum stat_types stwinavg[] = {WIN_AVG};
190
        enum stat_types stsum[] = {SUM};
191
        enum stat_types stsumrate[] = {SUM, RATE};
192

    
193
        dprintf("adding measures to %s\n",node_addr(id));
194

    
195
        /* HopCount */
196
       add_measure(&id->mhs[j++], HOPCOUNT, TXRXUNI | PACKET | IN_BAND, 120, "HopCount", stwinavg, sizeof(stwinavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[IP hops]
197

    
198
        /* Round Trip Time */
199
       add_measure(&id->mhs[j++], RTT, TXRXBI | PACKET | IN_BAND, 120, "RoundTripDelay", stwinavg, sizeof(stwinavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_SIGNALLING);        //[seconds]
200

    
201
        /* Loss */
202
       add_measure(&id->mhs[j++], LOSS, TXRXUNI | PACKET | IN_BAND, 120, "LossRate", stwinavg, sizeof(stwinavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //LossRate_avg [probability 0..1] LossRate_rate [lost_pkts/sec]
203

    
204
        // Cumulative Traffic
205
       //add_measure(&id->mhs[j++], BYTE, RXONLY | PACKET | IN_BAND, 120, "RxBytes", stsum, sizeof(stsum)/sizeof(enum stat_types), id->addr, MSG_TYPE_ANY);
206
       //add_measure(&id->mhs[j++], BYTE, TXONLY | PACKET | IN_BAND, 120, "TxBytes", stsum, sizeof(stsum)/sizeof(enum stat_types), id->addr, MSG_TYPE_ANY);
207
       add_measure(&id->mhs[j++], BYTE, RXONLY | PACKET | IN_BAND, 120, "RxBytesChunk", stsum, sizeof(stsum)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes]
208
       add_measure(&id->mhs[j++], BYTE, TXONLY | PACKET | IN_BAND, 120, "TxBytesChunk", stsum, sizeof(stsum)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes]
209

    
210
        // Traffic
211
       add_measure(&id->mhs[j++], BULK_TRANSFER, RXONLY | PACKET | TIMER_BASED, 120, "RxBytesChunkPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes/s]
212
       add_measure(&id->mhs[j++], BULK_TRANSFER, TXONLY | PACKET | TIMER_BASED, 120, "TxBytesChunkPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes/s]
213
       add_measure(&id->mhs[j++], BULK_TRANSFER, RXONLY | PACKET | TIMER_BASED, 120, "RxBytesSigPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_SIGNALLING);        //[bytes/s]
214
       add_measure(&id->mhs[j++], BULK_TRANSFER, TXONLY | PACKET | TIMER_BASED, 120, "TxBytesSigPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_SIGNALLING);        //[bytes/s]
215

    
216
        // Chunks
217
       add_measure(&id->mhs[j++], COUNTER, RXONLY | DATA | IN_BAND, 120, "RxChunks", stsumrate, sizeof(stsumrate)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //RxChunks_sum [chunks] RxChunks_rate [chunks/sec]
218
       add_measure(&id->mhs[j++], COUNTER, TXONLY | DATA | IN_BAND, 120, "TxChunks", stsumrate, sizeof(stsumrate)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //TxChunks_sum [chunks] TxChunks_rate [chunks/sec]
219

    
220
//        // Capacity
221
       add_measure(&id->mhs[j++], CLOCKDRIFT, TXRXUNI | PACKET | IN_BAND, 0, NULL, NULL, 0, id->addr, MSG_TYPE_CHUNK);
222
        monSetParameter (id->mhs[j], P_CLOCKDRIFT_ALGORITHM, 1);
223
       add_measure(&id->mhs[j++], CORRECTED_DELAY, TXRXUNI | PACKET | IN_BAND, 0, NULL, NULL, 0, id->addr, MSG_TYPE_CHUNK);
224
       add_measure(&id->mhs[j++], CAPACITY_CAPPROBE, TXRXUNI | PACKET | IN_BAND, 120, "Capacity", stwinavg, sizeof(stwinavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes/s]
225
        monSetParameter (id->mhs[j], P_CAPPROBE_DELAY_TH, -1);
226
//        monSetParameter (mh, P_CAPPROBE_PKT_TH, 100);
227
//        monSetParameter (mh, P_CAPPROBE_IPD_TH, 60);
228
//        monPublishStatisticalType(mh, NULL, st , sizeof(st)/sizeof(enum stat_types), repoclient);
229

    
230
        // for static must not be more then 10 or whatever size is in net_helper-ml.c
231
        id->n_mhs = j;
232
}
233

    
234
/*
235
 * Delete p2p measurements towards a peer
236
*/
237
void delete_measures(struct nodeID *id)
238
{
239
        int j;
240
        dprintf("deleting measures from %s\n",node_addr(id));
241
        while(id->n_mhs) {
242
                monDestroyMeasure(id->mhs[--(id->n_mhs)]);
243
        }
244
}
245

    
246
/*
247
 * Helper to retrieve a measure
248
*/
249
double get_measure(struct nodeID *id, int j, enum stat_types st)
250
{
251
        return (id->n_mhs > j) ? monRetrieveResult(id->mhs[j], st) : NAN;
252
}
253

    
254
/*
255
 * Hopcount to a given peer
256
*/
257
int get_hopcount(struct nodeID *id){
258
        double r = get_measure(id, 0, LAST);
259
        return isnan(r) ? -1 : (int) r;
260
}
261

    
262
/*
263
 * RTT to a given peer in seconds
264
*/
265
double get_rtt(struct nodeID *id){
266
        return get_measure(id, 1, WIN_AVG);
267
}
268

    
269
/*
270
 * average RTT to a set of peers in seconds
271
*/
272
double get_average_rtt(struct nodeID **ids, int len){
273
        int i;
274
        int n = 0;
275
        double sum = 0;
276

    
277
        for (i = 0; i < len; i++) {
278
                double l = get_rtt(ids[i]);
279
                if (!isnan(l)) {
280
                        sum += l;
281
                        n++;
282
                }
283
        }
284
        return (n > 0) ? sum / n : NAN;
285
}
286

    
287
/*
288
 * loss ratio from a given peer as 0..1
289
*/
290
double get_lossrate(struct nodeID *id){
291
        return get_measure(id, 2, WIN_AVG);
292
}
293

    
294
/*
295
 * average loss ratio from a set of peers as 0..1
296
*/
297
double get_average_lossrate(struct nodeID **ids, int len){
298
        int i;
299
        int n = 0;
300
        double sum = 0;
301

    
302
        for (i = 0; i < len; i++) {
303
                double l = get_lossrate(ids[i]);
304
                if (!isnan(l)) {
305
                        sum += l;
306
                        n++;
307
                }
308
        }
309
        return (n > 0) ? sum / n : NAN;
310
}