Statistics
| Branch: | Revision:

streamers / measures.c @ 049189f8

History | View | Annotate | Download (8.96 KB)

1
#include <math.h>
2
#ifndef NAN        //NAN is missing in some old math.h versions
3
#define NAN            (0.0/0.0)
4
#endif
5
#ifndef INFINITY
6
#define INFINITY       (1.0/0.0)
7
#endif
8

    
9
#include <mon.h>
10
#include <ml.h>
11

    
12
#include "channel.h"
13
#include "dbg.h"
14
#include <net_helper.h>
15

    
16
typedef struct nodeID {
17
        socketID_handle addr;
18
        int connID;        // connection associated to this node, -1 if myself
19
        int refcnt;
20
        //a quick and dirty static vector for measures TODO: make it dinamic
21
        MonHandler mhs[20];
22
        int n_mhs;
23
} nodeID;
24

    
25
static MonHandler chunk_dup, chunk_playout, neigh_size, chunk_receive, chunk_send, offer_accept, chunk_hops;
26
static MonHandler rx_bytes_chunk_per_sec, tx_bytes_chunk_per_sec, rx_bytes_sig_per_sec, tx_bytes_sig_per_sec;
27
static MonHandler rx_chunks, tx_chunks;
28

    
29
/*
30
 * Initialize one measure
31
*/
32
void add_measure(MonHandler *mh, MeasurementId id, MeasurementCapabilities mc, MonParameterValue rate, const char *pubname, enum stat_types st[], int length, SocketId dst, MsgType mt)
33
{
34
        *mh = monCreateMeasure(id, mc);
35
        monSetParameter (*mh, P_PUBLISHING_RATE, rate);
36
        monPublishStatisticalType(*mh, pubname, channel_get_name(), st , length, NULL);
37
        monActivateMeasure(*mh, dst, mt);
38
}
39

    
40
/*
41
 * Register duplicate arrival
42
*/
43
void reg_chunk_duplicate()
44
{
45
        if (!chunk_dup) {
46
                enum stat_types st[] = {SUM, RATE};
47
                add_measure(&chunk_dup, GENERIC, 0, 120, "ChunkDuplicates", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[chunks]
48
                monNewSample(chunk_dup, 0);        //force publish even if there are no events
49
        }
50
        monNewSample(chunk_dup, 1);
51
}
52

    
53
/*
54
 * Register playout/loss of a chunk before playout
55
*/
56
void reg_chunk_playout(bool b)
57
{
58
        if (!chunk_playout && b) {        //don't count losses before the first arrived chunk
59
                enum stat_types st[] = {AVG, SUM, RATE};
60
                add_measure(&chunk_playout, GENERIC, 0, 120, "ChunksPlayed", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[chunks]
61
        }
62
        monNewSample(chunk_playout, b);
63
}
64

    
65
/*
66
 * Register actual neghbourhood size
67
*/
68
void reg_neigh_size(int s)
69
{
70
        if (!neigh_size) {
71
                enum stat_types st[] = {LAST};
72
                add_measure(&neigh_size, GENERIC, 0, 120, "NeighSize", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
73
        }
74
        monNewSample(neigh_size, s);
75
}
76

    
77
/*
78
 * Register chunk receive event
79
*/
80
void reg_chunk_receive(int id, int hopcount)
81
{
82
        if (!chunk_receive) {
83
                enum stat_types st[] = {RATE};
84
                add_measure(&chunk_receive, GENERIC, 0, 120, "RxChunkAll", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
85
                monNewSample(chunk_receive, 0);        //force publish even if there are no events
86
        }
87
        monNewSample(chunk_receive, 1);
88

    
89
        if (!chunk_hops) {
90
                enum stat_types st[] = {WIN_AVG};
91
                add_measure(&chunk_hops, GENERIC, 0, 120, "Hops", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
92
        }
93
        monNewSample(chunk_hops, hopcount);
94
}
95

    
96
/*
97
 * Register chunk send event
98
*/
99
void reg_chunk_send(int id)
100
{
101
        if (!chunk_send) {
102
                enum stat_types st[] = {RATE};
103
                add_measure(&chunk_send, GENERIC, 0, 120, "TxChunkAll", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
104
                monNewSample(chunk_send, 0);        //force publish even if there are no events
105
        }
106
        monNewSample(chunk_send, 1);
107
}
108

    
109
/*
110
 * Register chunk accept evemt
111
*/
112
void reg_offer_accept(bool b)
113
{
114
        if (!offer_accept) {
115
                enum stat_types st[] = {AVG};
116
                add_measure(&offer_accept, GENERIC, 0, 120, "OfferAccept", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
117
        }
118
        monNewSample(offer_accept, b);
119
}
120

    
121
/*
122
 * Initialize peer level measurements
123
*/
124
void init_measures()
125
{
126
        enum stat_types stavg[] = {WIN_AVG};
127
        enum stat_types stsum[] = {SUM};
128

    
129
        // Traffic
130
       add_measure(&rx_bytes_chunk_per_sec, BULK_TRANSFER, RXONLY | PACKET | TIMER_BASED, 120, "RxBytesChunkPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), NULL, MSG_TYPE_CHUNK);        //[bytes/s]
131
       add_measure(&tx_bytes_chunk_per_sec, BULK_TRANSFER, TXONLY | PACKET | TIMER_BASED, 120, "TxBytesChunkPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), NULL, MSG_TYPE_CHUNK);        //[bytes/s]
132
       add_measure(&rx_bytes_sig_per_sec, BULK_TRANSFER, RXONLY | PACKET | TIMER_BASED, 120, "RxBytesSigPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), NULL, MSG_TYPE_SIGNALLING);        //[bytes/s]
133
       add_measure(&tx_bytes_sig_per_sec, BULK_TRANSFER, TXONLY | PACKET | TIMER_BASED, 120, "TxBytesSigPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), NULL, MSG_TYPE_SIGNALLING);        //[bytes/s]
134

    
135
        // Chunks
136
       // replaced by reg_chun_receive add_measure(&rx_chunks, COUNTER, RXONLY | DATA | IN_BAND, 120, "RxChunksAll", stsum, sizeof(stsum)/sizeof(enum stat_types), NULL, MSG_TYPE_CHUNK);        //[chunks]
137
       // replaced by reg_chun_send add_measure(&tx_chunks, COUNTER, TXONLY | DATA | IN_BAND, 120, "TxChunksAll", stsum, sizeof(stsum)/sizeof(enum stat_types), NULL, MSG_TYPE_CHUNK);        //[chunks]
138
}
139

    
140
/*
141
 * Initialize p2p measurements towards a peer
142
*/
143
void add_measures(struct nodeID *id)
144
{
145
        // Add measures
146
        int j = 0;
147
        enum stat_types stavg[] = {AVG};
148
        enum stat_types stwinavg[] = {WIN_AVG};
149
        enum stat_types stsum[] = {SUM};
150
        enum stat_types stsumrate[] = {SUM, RATE};
151

    
152
        dprintf("adding measures to %s\n",node_addr(id));
153

    
154
        /* HopCount */
155
       add_measure(&id->mhs[j++], HOPCOUNT, TXRXUNI | PACKET | IN_BAND, 120, "HopCount", stwinavg, sizeof(stwinavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[IP hops]
156

    
157
        /* Round Trip Time */
158
       add_measure(&id->mhs[j++], RTT, TXRXBI | PACKET | IN_BAND, 120, "RoundTripDelay", stwinavg, sizeof(stwinavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_SIGNALLING);        //[seconds]
159

    
160
        /* Loss */
161
       add_measure(&id->mhs[j++], LOSS, TXRXUNI | PACKET | IN_BAND, 120, "LossRate", stwinavg, sizeof(stwinavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //LossRate_avg [probability 0..1] LossRate_rate [lost_pkts/sec]
162

    
163
        // Cumulative Traffic
164
       //add_measure(&id->mhs[j++], BYTE, RXONLY | PACKET | IN_BAND, 120, "RxBytes", stsum, sizeof(stsum)/sizeof(enum stat_types), id->addr, MSG_TYPE_ANY);
165
       //add_measure(&id->mhs[j++], BYTE, TXONLY | PACKET | IN_BAND, 120, "TxBytes", stsum, sizeof(stsum)/sizeof(enum stat_types), id->addr, MSG_TYPE_ANY);
166
       add_measure(&id->mhs[j++], BYTE, RXONLY | PACKET | IN_BAND, 120, "RxBytesChunk", stsum, sizeof(stsum)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes]
167
       add_measure(&id->mhs[j++], BYTE, TXONLY | PACKET | IN_BAND, 120, "TxBytesChunk", stsum, sizeof(stsum)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes]
168

    
169
        // Traffic
170
       add_measure(&id->mhs[j++], BULK_TRANSFER, RXONLY | PACKET | TIMER_BASED, 120, "RxBytesChunkPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes/s]
171
       add_measure(&id->mhs[j++], BULK_TRANSFER, TXONLY | PACKET | TIMER_BASED, 120, "TxBytesChunkPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes/s]
172
       add_measure(&id->mhs[j++], BULK_TRANSFER, RXONLY | PACKET | TIMER_BASED, 120, "RxBytesSigPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_SIGNALLING);        //[bytes/s]
173
       add_measure(&id->mhs[j++], BULK_TRANSFER, TXONLY | PACKET | TIMER_BASED, 120, "TxBytesSigPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_SIGNALLING);        //[bytes/s]
174

    
175
        // Chunks
176
       add_measure(&id->mhs[j++], COUNTER, RXONLY | DATA | IN_BAND, 120, "RxChunks", stsumrate, sizeof(stsumrate)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //RxChunks_sum [chunks] RxChunks_rate [chunks/sec]
177
       add_measure(&id->mhs[j++], COUNTER, TXONLY | DATA | IN_BAND, 120, "TxChunks", stsumrate, sizeof(stsumrate)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //TxChunks_sum [chunks] TxChunks_rate [chunks/sec]
178

    
179
        // for static must not be more then 10 or whatever size is in net_helper-ml.c
180
        id->n_mhs = j;
181
}
182

    
183
/*
184
 * Delete p2p measurements towards a peer
185
*/
186
void delete_measures(struct nodeID *id)
187
{
188
        int j;
189
        dprintf("deleting measures from %s\n",node_addr(id));
190
        while(id->n_mhs) {
191
                monDestroyMeasure(id->mhs[--(id->n_mhs)]);
192
        }
193
}
194

    
195
/*
196
 * Helper to retrieve a measure
197
*/
198
double get_measure(struct nodeID *id, int j, enum stat_types st)
199
{
200
        return (id->n_mhs > j) ? monRetrieveResult(id->mhs[j], st) : NAN;
201
}
202

    
203
/*
204
 * Hopcount to a given peer
205
*/
206
int get_hopcount(struct nodeID *id){
207
        double r = get_measure(id, 0, LAST);
208
        return isnan(r) ? -1 : (int) r;
209
}
210

    
211
/*
212
 * RTT to a given peer in seconds
213
*/
214
double get_rtt(struct nodeID *id){
215
        return get_measure(id, 1, WIN_AVG);
216
}
217

    
218
/*
219
 * average RTT to a set of peers in seconds
220
*/
221
double get_average_rtt(struct nodeID **ids, int len){
222
        int i;
223
        int n = 0;
224
        double sum = 0;
225

    
226
        for (i = 0; i < len; i++) {
227
                double l = get_rtt(ids[i]);
228
                if (!isnan(l)) {
229
                        sum += l;
230
                        n++;
231
                }
232
        }
233
        return (n > 0) ? sum / n : NAN;
234
}
235

    
236
/*
237
 * loss ratio from a given peer as 0..1
238
*/
239
double get_lossrate(struct nodeID *id){
240
        return get_measure(id, 2, WIN_AVG);
241
}
242

    
243
/*
244
 * average loss ratio from a set of peers as 0..1
245
*/
246
double get_average_lossrate(struct nodeID **ids, int len){
247
        int i;
248
        int n = 0;
249
        double sum = 0;
250

    
251
        for (i = 0; i < len; i++) {
252
                double l = get_lossrate(ids[i]);
253
                if (!isnan(l)) {
254
                        sum += l;
255
                        n++;
256
                }
257
        }
258
        return (n > 0) ? sum / n : NAN;
259
}