Statistics
| Branch: | Revision:

streamers / measures.c @ 19d6b4ca

History | View | Annotate | Download (9.7 KB)

1
#include <math.h>
2
#ifndef NAN        //NAN is missing in some old math.h versions
3
#define NAN            (0.0/0.0)
4
#endif
5
#ifndef INFINITY
6
#define INFINITY       (1.0/0.0)
7
#endif
8

    
9
#include <mon.h>
10
#include <ml.h>
11

    
12
#include "channel.h"
13
#include "dbg.h"
14
#include <net_helper.h>
15

    
16
typedef struct nodeID {
17
        socketID_handle addr;
18
        int connID;        // connection associated to this node, -1 if myself
19
        int refcnt;
20
        //a quick and dirty static vector for measures TODO: make it dinamic
21
        MonHandler mhs[20];
22
        int n_mhs;
23
} nodeID;
24

    
25
static MonHandler chunk_dup, chunk_playout, neigh_size, chunk_receive, chunk_send, offer_accept, chunk_hops, chunk_delay, playout_delay;
26
static MonHandler rx_bytes_chunk_per_sec, tx_bytes_chunk_per_sec, rx_bytes_sig_per_sec, tx_bytes_sig_per_sec;
27
static MonHandler rx_chunks, tx_chunks;
28

    
29
/*
30
 * Initialize one measure
31
*/
32
void add_measure(MonHandler *mh, MeasurementId id, MeasurementCapabilities mc, MonParameterValue rate, const char *pubname, enum stat_types st[], int length, SocketId dst, MsgType mt)
33
{
34
        *mh = monCreateMeasure(id, mc);
35
        monSetParameter (*mh, P_PUBLISHING_RATE, rate);
36
        monPublishStatisticalType(*mh, pubname, channel_get_name(), st , length, NULL);
37
        monActivateMeasure(*mh, dst, mt);
38
}
39

    
40
/*
41
 * Register duplicate arrival
42
*/
43
void reg_chunk_duplicate()
44
{
45
        if (!chunk_dup) {
46
                enum stat_types st[] = {SUM, RATE};
47
                add_measure(&chunk_dup, GENERIC, 0, 120, "ChunkDuplicates", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[chunks]
48
                monNewSample(chunk_dup, 0);        //force publish even if there are no events
49
        }
50
        monNewSample(chunk_dup, 1);
51
}
52

    
53
/*
54
 * Register playout/loss of a chunk before playout
55
*/
56
void reg_chunk_playout(int id, bool b, uint64_t timestamp)
57
{
58
        struct timeval tnow;
59

    
60
        if (!chunk_playout && b) {        //don't count losses before the first arrived chunk
61
                enum stat_types st[] = {AVG, SUM, RATE};
62
                add_measure(&chunk_playout, GENERIC, 0, 120, "ChunksPlayed", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[chunks]
63
        }
64
        monNewSample(chunk_playout, b);
65

    
66
        if (!playout_delay) {
67
                enum stat_types st[] = {WIN_AVG, WIN_VAR};
68
                add_measure(&playout_delay, GENERIC, 0, 120, "PlayoutDelay", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
69
        }
70
        gettimeofday(&tnow, NULL);
71
        monNewSample(playout_delay, tnow.tv_usec + tnow.tv_sec * 1000000ULL - timestamp);
72
}
73

    
74
/*
75
 * Register actual neghbourhood size
76
*/
77
void reg_neigh_size(int s)
78
{
79
        if (!neigh_size) {
80
                enum stat_types st[] = {LAST};
81
                add_measure(&neigh_size, GENERIC, 0, 120, "NeighSize", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
82
        }
83
        monNewSample(neigh_size, s);
84
}
85

    
86
/*
87
 * Register chunk receive event
88
*/
89
void reg_chunk_receive(int id, uint64_t timestamp, int hopcount)
90
{
91
        struct timeval tnow;
92

    
93
        if (!chunk_receive) {
94
                enum stat_types st[] = {RATE};
95
                add_measure(&chunk_receive, GENERIC, 0, 120, "RxChunkAll", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
96
                monNewSample(chunk_receive, 0);        //force publish even if there are no events
97
        }
98
        monNewSample(chunk_receive, 1);
99

    
100
        if (!chunk_hops) {
101
                enum stat_types st[] = {WIN_AVG, WIN_VAR};
102
                add_measure(&chunk_hops, GENERIC, 0, 120, "Hops", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
103
        }
104
        monNewSample(chunk_hops, hopcount);
105

    
106
        if (!chunk_delay) {
107
                enum stat_types st[] = {WIN_AVG, WIN_VAR};
108
                add_measure(&chunk_delay, GENERIC, 0, 120, "ReceiveDelay", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
109
        }
110
        gettimeofday(&tnow, NULL);
111
        monNewSample(chunk_delay, tnow.tv_usec + tnow.tv_sec * 1000000ULL - timestamp);
112
}
113

    
114
/*
115
 * Register chunk send event
116
*/
117
void reg_chunk_send(int id)
118
{
119
        if (!chunk_send) {
120
                enum stat_types st[] = {RATE};
121
                add_measure(&chunk_send, GENERIC, 0, 120, "TxChunkAll", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
122
                monNewSample(chunk_send, 0);        //force publish even if there are no events
123
        }
124
        monNewSample(chunk_send, 1);
125
}
126

    
127
/*
128
 * Register chunk accept evemt
129
*/
130
void reg_offer_accept(bool b)
131
{
132
        if (!offer_accept) {
133
                enum stat_types st[] = {AVG};
134
                add_measure(&offer_accept, GENERIC, 0, 120, "OfferAccept", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
135
        }
136
        monNewSample(offer_accept, b);
137
}
138

    
139
/*
140
 * Initialize peer level measurements
141
*/
142
void init_measures()
143
{
144
        enum stat_types stavg[] = {WIN_AVG};
145
        enum stat_types stsum[] = {SUM};
146

    
147
        // Traffic
148
       add_measure(&rx_bytes_chunk_per_sec, BULK_TRANSFER, RXONLY | PACKET | TIMER_BASED, 120, "RxBytesChunkPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), NULL, MSG_TYPE_CHUNK);        //[bytes/s]
149
       add_measure(&tx_bytes_chunk_per_sec, BULK_TRANSFER, TXONLY | PACKET | TIMER_BASED, 120, "TxBytesChunkPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), NULL, MSG_TYPE_CHUNK);        //[bytes/s]
150
       add_measure(&rx_bytes_sig_per_sec, BULK_TRANSFER, RXONLY | PACKET | TIMER_BASED, 120, "RxBytesSigPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), NULL, MSG_TYPE_SIGNALLING);        //[bytes/s]
151
       add_measure(&tx_bytes_sig_per_sec, BULK_TRANSFER, TXONLY | PACKET | TIMER_BASED, 120, "TxBytesSigPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), NULL, MSG_TYPE_SIGNALLING);        //[bytes/s]
152

    
153
        // Chunks
154
       // replaced by reg_chun_receive add_measure(&rx_chunks, COUNTER, RXONLY | DATA | IN_BAND, 120, "RxChunksAll", stsum, sizeof(stsum)/sizeof(enum stat_types), NULL, MSG_TYPE_CHUNK);        //[chunks]
155
       // replaced by reg_chun_send add_measure(&tx_chunks, COUNTER, TXONLY | DATA | IN_BAND, 120, "TxChunksAll", stsum, sizeof(stsum)/sizeof(enum stat_types), NULL, MSG_TYPE_CHUNK);        //[chunks]
156
}
157

    
158
/*
159
 * Initialize p2p measurements towards a peer
160
*/
161
void add_measures(struct nodeID *id)
162
{
163
        // Add measures
164
        int j = 0;
165
        enum stat_types stavg[] = {AVG};
166
        enum stat_types stwinavg[] = {WIN_AVG};
167
        enum stat_types stsum[] = {SUM};
168
        enum stat_types stsumrate[] = {SUM, RATE};
169

    
170
        dprintf("adding measures to %s\n",node_addr(id));
171

    
172
        /* HopCount */
173
       add_measure(&id->mhs[j++], HOPCOUNT, TXRXUNI | PACKET | IN_BAND, 120, "HopCount", stwinavg, sizeof(stwinavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[IP hops]
174

    
175
        /* Round Trip Time */
176
       add_measure(&id->mhs[j++], RTT, TXRXBI | PACKET | IN_BAND, 120, "RoundTripDelay", stwinavg, sizeof(stwinavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_SIGNALLING);        //[seconds]
177

    
178
        /* Loss */
179
       add_measure(&id->mhs[j++], LOSS, TXRXUNI | PACKET | IN_BAND, 120, "LossRate", stwinavg, sizeof(stwinavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //LossRate_avg [probability 0..1] LossRate_rate [lost_pkts/sec]
180

    
181
        // Cumulative Traffic
182
       //add_measure(&id->mhs[j++], BYTE, RXONLY | PACKET | IN_BAND, 120, "RxBytes", stsum, sizeof(stsum)/sizeof(enum stat_types), id->addr, MSG_TYPE_ANY);
183
       //add_measure(&id->mhs[j++], BYTE, TXONLY | PACKET | IN_BAND, 120, "TxBytes", stsum, sizeof(stsum)/sizeof(enum stat_types), id->addr, MSG_TYPE_ANY);
184
       add_measure(&id->mhs[j++], BYTE, RXONLY | PACKET | IN_BAND, 120, "RxBytesChunk", stsum, sizeof(stsum)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes]
185
       add_measure(&id->mhs[j++], BYTE, TXONLY | PACKET | IN_BAND, 120, "TxBytesChunk", stsum, sizeof(stsum)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes]
186

    
187
        // Traffic
188
       add_measure(&id->mhs[j++], BULK_TRANSFER, RXONLY | PACKET | TIMER_BASED, 120, "RxBytesChunkPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes/s]
189
       add_measure(&id->mhs[j++], BULK_TRANSFER, TXONLY | PACKET | TIMER_BASED, 120, "TxBytesChunkPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes/s]
190
       add_measure(&id->mhs[j++], BULK_TRANSFER, RXONLY | PACKET | TIMER_BASED, 120, "RxBytesSigPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_SIGNALLING);        //[bytes/s]
191
       add_measure(&id->mhs[j++], BULK_TRANSFER, TXONLY | PACKET | TIMER_BASED, 120, "TxBytesSigPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_SIGNALLING);        //[bytes/s]
192

    
193
        // Chunks
194
       add_measure(&id->mhs[j++], COUNTER, RXONLY | DATA | IN_BAND, 120, "RxChunks", stsumrate, sizeof(stsumrate)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //RxChunks_sum [chunks] RxChunks_rate [chunks/sec]
195
       add_measure(&id->mhs[j++], COUNTER, TXONLY | DATA | IN_BAND, 120, "TxChunks", stsumrate, sizeof(stsumrate)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //TxChunks_sum [chunks] TxChunks_rate [chunks/sec]
196

    
197
        // for static must not be more then 10 or whatever size is in net_helper-ml.c
198
        id->n_mhs = j;
199
}
200

    
201
/*
202
 * Delete p2p measurements towards a peer
203
*/
204
void delete_measures(struct nodeID *id)
205
{
206
        int j;
207
        dprintf("deleting measures from %s\n",node_addr(id));
208
        while(id->n_mhs) {
209
                monDestroyMeasure(id->mhs[--(id->n_mhs)]);
210
        }
211
}
212

    
213
/*
214
 * Helper to retrieve a measure
215
*/
216
double get_measure(struct nodeID *id, int j, enum stat_types st)
217
{
218
        return (id->n_mhs > j) ? monRetrieveResult(id->mhs[j], st) : NAN;
219
}
220

    
221
/*
222
 * Hopcount to a given peer
223
*/
224
int get_hopcount(struct nodeID *id){
225
        double r = get_measure(id, 0, LAST);
226
        return isnan(r) ? -1 : (int) r;
227
}
228

    
229
/*
230
 * RTT to a given peer in seconds
231
*/
232
double get_rtt(struct nodeID *id){
233
        return get_measure(id, 1, WIN_AVG);
234
}
235

    
236
/*
237
 * average RTT to a set of peers in seconds
238
*/
239
double get_average_rtt(struct nodeID **ids, int len){
240
        int i;
241
        int n = 0;
242
        double sum = 0;
243

    
244
        for (i = 0; i < len; i++) {
245
                double l = get_rtt(ids[i]);
246
                if (!isnan(l)) {
247
                        sum += l;
248
                        n++;
249
                }
250
        }
251
        return (n > 0) ? sum / n : NAN;
252
}
253

    
254
/*
255
 * loss ratio from a given peer as 0..1
256
*/
257
double get_lossrate(struct nodeID *id){
258
        return get_measure(id, 2, WIN_AVG);
259
}
260

    
261
/*
262
 * average loss ratio from a set of peers as 0..1
263
*/
264
double get_average_lossrate(struct nodeID **ids, int len){
265
        int i;
266
        int n = 0;
267
        double sum = 0;
268

    
269
        for (i = 0; i < len; i++) {
270
                double l = get_lossrate(ids[i]);
271
                if (!isnan(l)) {
272
                        sum += l;
273
                        n++;
274
                }
275
        }
276
        return (n > 0) ? sum / n : NAN;
277
}