Statistics
| Branch: | Revision:

streamers / measures.c @ 3ae5a46f

History | View | Annotate | Download (8.07 KB)

1
#include <math.h>
2
#ifndef NAN        //NAN is missing in some old math.h versions
3
#define NAN            (0.0/0.0)
4
#endif
5
#ifndef INFINITY
6
#define INFINITY       (1.0/0.0)
7
#endif
8

    
9
#include <mon.h>
10
#include <ml.h>
11

    
12
#include "channel.h"
13
#include "dbg.h"
14
#include <net_helper.h>
15

    
16
typedef struct nodeID {
17
        socketID_handle addr;
18
        int connID;        // connection associated to this node, -1 if myself
19
        int refcnt;
20
        //a quick and dirty static vector for measures TODO: make it dinamic
21
        MonHandler mhs[20];
22
        int n_mhs;
23
} nodeID;
24

    
25
static MonHandler chunk_dup, chunk_playout, neigh_size, chunk_receive, chunk_send;
26
static MonHandler rx_bytes_chunk_per_sec, tx_bytes_chunk_per_sec, rx_bytes_sig_per_sec, tx_bytes_sig_per_sec;
27
static MonHandler rx_chunks, tx_chunks;
28

    
29
/*
30
 * Initialize one measure
31
*/
32
void add_measure(MonHandler *mh, MeasurementId id, MeasurementCapabilities mc, MonParameterValue rate, const char *pubname, enum stat_types st[], int length, SocketId dst, MsgType mt)
33
{
34
        *mh = monCreateMeasure(id, mc);
35
        monSetParameter (*mh, P_PUBLISHING_RATE, rate);
36
        monPublishStatisticalType(*mh, pubname, channel_get_name(), st , length, NULL);
37
        monActivateMeasure(*mh, dst, mt);
38
}
39

    
40
/*
41
 * Register duplicate arrival
42
*/
43
void reg_chunk_duplicate()
44
{
45
        if (!chunk_dup) {
46
                enum stat_types st[] = {SUM, RATE};
47
                add_measure(&chunk_dup, GENERIC, 0, 120, "ChunkDuplicates", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[chunks]
48
                monNewSample(chunk_dup, 0);        //force publish even if there are no events
49
        }
50
        monNewSample(chunk_dup, 1);
51
}
52

    
53
/*
54
 * Register playout/loss of a chunk before playout
55
*/
56
void reg_chunk_playout(bool b)
57
{
58
        if (!chunk_playout && b) {        //don't count losses before the first arrived chunk
59
                enum stat_types st[] = {AVG, SUM, RATE};
60
                add_measure(&chunk_playout, GENERIC, 0, 120, "ChunksPlayed", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[chunks]
61
        }
62
        monNewSample(chunk_playout, b);
63
}
64

    
65
/*
66
 * Register actual neghbourhood size
67
*/
68
void reg_neigh_size(int s)
69
{
70
        if (!neigh_size) {
71
                enum stat_types st[] = {LAST};
72
                add_measure(&neigh_size, GENERIC, 0, 120, "NeighSize", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
73
        }
74
        monNewSample(neigh_size, s);
75
}
76

    
77
/*
78
 * Register chunk receive event
79
*/
80
void reg_chunk_receive(int id)
81
{
82
        if (!chunk_receive) {
83
                enum stat_types st[] = {RATE};
84
                add_measure(&chunk_receive, GENERIC, 0, 120, "RxChunkAll", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
85
                monNewSample(chunk_receive, 0);        //force publish even if there are no events
86
        }
87
        monNewSample(chunk_receive, 1);
88
}
89

    
90
/*
91
 * Register chunk send event
92
*/
93
void reg_chunk_send(int id)
94
{
95
        if (!chunk_send) {
96
                enum stat_types st[] = {RATE};
97
                add_measure(&chunk_send, GENERIC, 0, 120, "TxChunkAll", st, sizeof(st)/sizeof(enum stat_types), NULL, MSG_TYPE_ANY);        //[peers]
98
                monNewSample(chunk_send, 0);        //force publish even if there are no events
99
        }
100
        monNewSample(chunk_send, 1);
101
}
102

    
103
/*
104
 * Initialize peer level measurements
105
*/
106
void init_measures()
107
{
108
        enum stat_types stavg[] = {WIN_AVG};
109
        enum stat_types stsum[] = {SUM};
110

    
111
        // Traffic
112
       add_measure(&rx_bytes_chunk_per_sec, BULK_TRANSFER, RXONLY | PACKET | TIMER_BASED, 120, "RxBytesChunkPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), NULL, MSG_TYPE_CHUNK);        //[bytes/s]
113
       add_measure(&tx_bytes_chunk_per_sec, BULK_TRANSFER, TXONLY | PACKET | TIMER_BASED, 120, "TxBytesChunkPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), NULL, MSG_TYPE_CHUNK);        //[bytes/s]
114
       add_measure(&rx_bytes_sig_per_sec, BULK_TRANSFER, RXONLY | PACKET | TIMER_BASED, 120, "RxBytesSigPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), NULL, MSG_TYPE_SIGNALLING);        //[bytes/s]
115
       add_measure(&tx_bytes_sig_per_sec, BULK_TRANSFER, TXONLY | PACKET | TIMER_BASED, 120, "TxBytesSigPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), NULL, MSG_TYPE_SIGNALLING);        //[bytes/s]
116

    
117
        // Chunks
118
       // replaced by reg_chun_receive add_measure(&rx_chunks, COUNTER, RXONLY | DATA | IN_BAND, 120, "RxChunksAll", stsum, sizeof(stsum)/sizeof(enum stat_types), NULL, MSG_TYPE_CHUNK);        //[chunks]
119
       // replaced by reg_chun_send add_measure(&tx_chunks, COUNTER, TXONLY | DATA | IN_BAND, 120, "TxChunksAll", stsum, sizeof(stsum)/sizeof(enum stat_types), NULL, MSG_TYPE_CHUNK);        //[chunks]
120
}
121

    
122
/*
123
 * Initialize p2p measurements towards a peer
124
*/
125
void add_measures(struct nodeID *id)
126
{
127
        // Add measures
128
        int j = 0;
129
        enum stat_types stavg[] = {AVG};
130
        enum stat_types stwinavg[] = {WIN_AVG};
131
        enum stat_types stsum[] = {SUM};
132
        enum stat_types stsumrate[] = {SUM, RATE};
133

    
134
        dprintf("adding measures to %s\n",node_addr(id));
135

    
136
        /* HopCount */
137
       add_measure(&id->mhs[j++], HOPCOUNT, TXRXUNI | PACKET | IN_BAND, 600, "HopCount", stwinavg, sizeof(stwinavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[IP hops]
138

    
139
        /* Round Trip Time */
140
       add_measure(&id->mhs[j++], RTT, TXRXBI | PACKET | IN_BAND, 120, "RoundTripDelay", stwinavg, sizeof(stwinavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_SIGNALLING);        //[seconds]
141

    
142
        /* Loss */
143
       add_measure(&id->mhs[j++], LOSS, TXRXUNI | PACKET | IN_BAND, 120, "LossRate", stwinavg, sizeof(stwinavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //LossRate_avg [probability 0..1] LossRate_rate [lost_pkts/sec]
144

    
145
        // Cumulative Traffic
146
       //add_measure(&id->mhs[j++], BYTE, RXONLY | PACKET | IN_BAND, 120, "RxBytes", stsum, sizeof(stsum)/sizeof(enum stat_types), id->addr, MSG_TYPE_ANY);
147
       //add_measure(&id->mhs[j++], BYTE, TXONLY | PACKET | IN_BAND, 120, "TxBytes", stsum, sizeof(stsum)/sizeof(enum stat_types), id->addr, MSG_TYPE_ANY);
148
       add_measure(&id->mhs[j++], BYTE, RXONLY | PACKET | IN_BAND, 300, "RxBytesChunk", stsum, sizeof(stsum)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes]
149
       add_measure(&id->mhs[j++], BYTE, TXONLY | PACKET | IN_BAND, 300, "TxBytesChunk", stsum, sizeof(stsum)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes]
150

    
151
        // Traffic
152
       add_measure(&id->mhs[j++], BULK_TRANSFER, RXONLY | PACKET | TIMER_BASED, 300, "RxBytesChunkPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes/s]
153
       add_measure(&id->mhs[j++], BULK_TRANSFER, TXONLY | PACKET | TIMER_BASED, 300, "TxBytesChunkPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //[bytes/s]
154
       add_measure(&id->mhs[j++], BULK_TRANSFER, RXONLY | PACKET | TIMER_BASED, 300, "RxBytesSigPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_SIGNALLING);        //[bytes/s]
155
       add_measure(&id->mhs[j++], BULK_TRANSFER, TXONLY | PACKET | TIMER_BASED, 300, "TxBytesSigPSec", stavg, sizeof(stavg)/sizeof(enum stat_types), id->addr, MSG_TYPE_SIGNALLING);        //[bytes/s]
156

    
157
        // Chunks
158
       add_measure(&id->mhs[j++], COUNTER, RXONLY | DATA | IN_BAND, 300, "RxChunks", stsumrate, sizeof(stsumrate)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //RxChunks_sum [chunks] RxChunks_rate [chunks/sec]
159
       add_measure(&id->mhs[j++], COUNTER, TXONLY | DATA | IN_BAND, 300, "TxChunks", stsumrate, sizeof(stsumrate)/sizeof(enum stat_types), id->addr, MSG_TYPE_CHUNK);        //TxChunks_sum [chunks] TxChunks_rate [chunks/sec]
160

    
161
        // for static must not be more then 10 or whatever size is in net_helper-ml.c
162
        id->n_mhs = j;
163
}
164

    
165
/*
166
 * Delete p2p measurements towards a peer
167
*/
168
void delete_measures(struct nodeID *id)
169
{
170
        int j;
171
        dprintf("deleting measures from %s\n",node_addr(id));
172
        for(j = 0; j < id->n_mhs; j++) {
173
                monDestroyMeasure(id->mhs[j]);
174
        }
175
}
176

    
177
/*
178
 * Helper to retrieve a measure
179
*/
180
double get_measure(struct nodeID *id, int j, enum stat_types st)
181
{
182
        return monRetrieveResult(id->mhs[j], st);
183
}
184

    
185
/*
186
 * Hopcount to a given peer
187
*/
188
int get_hopcount(struct nodeID *id){
189
        double r = get_measure(id, 0, LAST);
190
        return isnan(r) ? -1 : (int) r;
191
}
192

    
193
/*
194
 * RTT to a given peer in seconds
195
*/
196
double get_rtt(struct nodeID *id){
197
        return get_measure(id, 1, WIN_AVG);
198
}
199

    
200
/*
201
 * loss ratio from a given peer as 0..1
202
*/
203
double get_lossrate(struct nodeID *id){
204
        return get_measure(id, 2, WIN_AVG);
205
}
206

    
207
double get_average_lossrate(struct nodeID **ids, int len){
208
        int i;
209
        int n = 0;
210
        double sum = 0;
211

    
212
        for (i = 0; i < len; i++) {
213
                double l = get_lossrate(ids[i]);
214
                if (!isnan(l)) {
215
                        sum += l;
216
                        n++;
217
                }
218
        }
219
        return (n > 0) ? sum / n : NAN;
220
}