Statistics
| Branch: | Revision:

napa-baselibs / monl / plugins / capprobe_measure.cpp @ 507372bb

History | View | Annotate | Download (8.31 KB)

1
/***************************************************************************
2
 *   Copyright (C) 2009 by Robert Birke
3
 *   robert.birke@polito.it
4
 *
5
 * This library is free software; you can redistribute it and/or
6
 * modify it under the terms of the GNU Lesser General Public
7
 * License as published by the Free Software Foundation; either
8
 * version 2.1 of the License, or (at your option) any later version.
9

10
 * This library is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13
 * Lesser General Public License for more details.
14

15
 * You should have received a copy of the GNU Lesser General Public
16
 * License along with this library; if not, write to the Free Software
17
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA 
18
 ***************************************************************************/
19

    
20
#include "capprobe_measure.h"
21
#include "napa_log.h"
22
#include <sys/time.h>
23
#include <math.h>
24
#include <string>
25

    
26
#ifdef WIN32
27
#define random(x) rand(x)
28
#endif
29

    
30
//defualt values only! can be changed using the changeParam() function
31
#define LOWER_LAYER_OVERHEAD 8+20+18+8+12
32
#define CAPPROBE_MTU 1300
33
#define CAPPROBE_SAMPLE_WINDOW 100
34

    
35
bool double_sort_function (double i,double j) { return (i<j); }
36

    
37

    
38
CapprobeMeasure::CapprobeMeasure(class MeasurePlugin *m, MeasurementCapabilities mc, class MeasureDispatcher *md): MonMeasure(m,mc,md) {
39
        pkt = NULL;
40
        max_size = 0;
41
}
42

    
43
CapprobeMeasure::~CapprobeMeasure() {
44
        if(pkt != NULL)
45
                delete[] pkt;
46
}
47

    
48
void CapprobeMeasure::init() {
49
        samples.clear();
50
        samples.reserve((int) param_values[P_CAPPROBE_PKT_TH]);
51

    
52
        if(pkt != NULL)
53
                delete[] pkt;
54
        pkt = new char[(int) param_values[P_CAPPROBE_PAYLOAD_SIZE]];
55

    
56
        avg_pause = param_values[P_CAPPROBE_PAYLOAD_SIZE] / param_values[P_CAPPROBE_RATE] * 8000.0;// [us]
57
        pair_num = 0;
58
        min_delay = NAN;
59
        min_ipd = NAN;
60
        last_rx = NAN;
61
        last_rx_pn = NAN;
62
        max_ipd = NAN;
63
        last_size = NAN;
64
}
65

    
66
void CapprobeMeasure::stop() {
67
        r_rx_list[R_CAPACITY_CAPPROBE] = r_tx_list[R_CAPACITY_CAPPROBE] = NAN;
68
        if(pkt != NULL)
69
                delete[] pkt;
70
        pkt = NULL;
71
}
72

    
73

    
74
int CapprobeMeasure::paramChange(MonParameterType ph, MonParameterValue p){
75
        switch(ph) {
76
                case P_CAPPROBE_RATE:
77
                        param_values[P_CAPPROBE_RATE] = p;
78
                        avg_pause = param_values[P_CAPPROBE_PAYLOAD_SIZE] / param_values[P_CAPPROBE_RATE] * 8000.0;
79
                        break;
80
                case P_CAPPROBE_PAYLOAD_SIZE:
81
                        param_values[P_CAPPROBE_PAYLOAD_SIZE] = p;
82
                        if(pkt != NULL)
83
                                delete[] pkt;
84
                        pkt = new char[(int) param_values[P_CAPPROBE_PAYLOAD_SIZE]];
85
                        avg_pause = param_values[P_CAPPROBE_PAYLOAD_SIZE] / param_values[P_CAPPROBE_RATE] * 8000.0;
86
                        break;
87
                case P_CAPPROBE_PKT_TH:
88
                        param_values[P_CAPPROBE_PKT_TH] = p;
89
                        samples.clear();
90
                        samples.reserve((int) param_values[P_CAPPROBE_PKT_TH]);
91
                        break;
92
                case P_CAPPROBE_DELAY_TH:
93
                        param_values[P_CAPPROBE_DELAY_TH] = p;
94
                        break;
95
                case P_CAPPROBE_IPD_TH:
96
                        param_values[P_CAPPROBE_IPD_TH] = p;
97
                        break;
98
                case P_CAPPROBE_HEADER_SIZE:
99
                        param_values[P_CAPPROBE_HEADER_SIZE] = p;
100
                        break;
101
        }
102
        return EOK;
103
}
104

    
105
double CapprobeMeasure::gen_pause_exp() {
106
        double p = random();
107
        /*turn this into a random number between 0 and 1...*/
108
        double f = p / (double) INT_MAX;
109
        return -1 * avg_pause * log(1-f);
110
}
111

    
112
void CapprobeMeasure::Run() {
113
        double pause;
114
        uint32_t *pn = (uint32_t*) pkt;
115

    
116
        if(flags & REMOTE)
117
                return;
118

    
119
        *pn = pair_num++;
120
        sendOobData(pkt, CAPPROBE_MTU);
121
        sendOobData(pkt, CAPPROBE_MTU);
122

    
123
        struct timeval next;
124

    
125
        pause = gen_pause_exp();
126
        next.tv_sec = (unsigned long)pause / 1000000;
127
        next.tv_usec = (unsigned long)fmod(pause, 100000.0);
128

    
129
        scheduleNextIn(&next);
130
}
131

    
132
void CapprobeMeasure::receiveOobData(char *buf, int buf_len, result *r) {
133
        uint32_t *pn = (uint32_t*) pkt;
134

    
135
        if(flags & REMOTE) { //remote
136
                result cap;
137
                //look for min delay
138
                if(min_delay > r[R_CORRECTED_DELAY] || isnan(min_delay))
139
                        min_delay = r[P_CAPPROBE_FILTER];
140
        
141
                //take only consecutive packets
142
                if(*pn == last_rx_pn) {
143
                        cap = computeCapacity(last_rx_delay, r[R_CORRECTED_DELAY], r[R_RECEIVE_TIME] - last_rx, r[R_SIZE], r);
144
                        if(!isnan(cap))
145
                                sendOobData((char *) &cap, sizeof(cap));
146
                }
147

    
148
                last_rx = r[R_RECEIVE_TIME];
149
                last_rx_delay = r[R_CORRECTED_DELAY];
150
                last_rx_pn = *pn;
151

    
152
        } else  {
153
                result *cap = (result *) pkt;
154
                newSample(*cap);
155
        }
156
}
157

    
158
result CapprobeMeasure::RxPkt(result *r,ExecutionList *el) {
159
        result cap = NAN;
160

    
161
        if(!(flags & REMOTE)) //local: nothing to do
162
                return NAN;
163

    
164
        //look for min delay
165
        if(min_delay > r[R_CORRECTED_DELAY] || isnan(min_delay))
166
                min_delay = r[P_CAPPROBE_FILTER];
167

    
168
        //take only consecutive packets
169
        if(r[R_DATA_ID] == last_rx_pn)
170
        // &&  next_offset == r[R_DATA_OFFSET]) TODO: should also check in order
171
                cap = computeCapacity(last_rx_delay, r[R_CORRECTED_DELAY], r[R_RECEIVE_TIME] - last_rx, last_size, r);
172

    
173
        last_rx = r[R_RECEIVE_TIME];
174
        last_rx_delay = r[R_CORRECTED_DELAY];
175
        last_rx_pn = r[R_DATA_ID];
176
        last_size = r[R_SIZE];
177
        
178
        return cap;
179
}
180

    
181

    
182
result CapprobeMeasure::computeCapacity(result delay1, result delay2, result ipg, result size, result *r) {
183

    
184
        //filter on min_delay
185
        if((delay1 < min_delay + param_values[P_CAPPROBE_DELAY_TH] / 1000000.0 &&
186
                delay2 < min_delay + param_values[P_CAPPROBE_DELAY_TH] / 1000000.0) ||
187
                param_values[P_CAPPROBE_DELAY_TH] < 0)
188
        {
189
                if(!isnan(ipg)) {
190
                                samples.push_back(ipg);
191
                                if(ipg < min_ipd || isnan(min_ipd))
192
                                        min_ipd = ipg;
193
                                if(ipg > max_ipd || isnan(max_ipd))
194
                                        max_ipd = ipg;
195
                }
196
        }
197

    
198
        if(samples.size() >= param_values[P_CAPPROBE_PKT_TH]) {// when collected enough data try to make an estimate
199
                int j,i,c, max_c, i_max;
200
                result threshold, interval;
201

    
202
                if(max_size < size || isnan(max_size))
203
                        max_size = size;
204

    
205
                std::sort(samples.begin(), samples.end(), double_sort_function);
206
                
207
                switch((int) param_values[P_CAPPROBE_FILTER]) {
208
                case 0: //pdf based
209
                                c = 0; max_c = -1; i_max = 0;
210
                                interval = (max_ipd - min_ipd) / param_values[P_CAPPROBE_NUM_INTERVALS];
211
                                threshold = min_ipd + interval;
212
                                for(i = 0; i < samples.size(); i++) {
213
                                        if(samples[i] >= threshold) {
214
                                                if(c > max_c) {
215
                                                        max_c = c;
216
                                                        i_max = i - c;
217
                                                }
218
                                                while(samples[i] >= threshold)
219
                                                        threshold += interval;
220
                                                c = 1;
221
                                        } else
222
                                                c++;
223
                                }
224
                                j = (i_max + max_c / 2);
225
                                break;
226
                case 1: //median based
227
                                for(j=0; j < samples.size(); j++) {
228
                                                if(samples[j] > min_ipd * (1 + param_values[P_CAPPROBE_IPD_TH] / 100.0))
229
                                                        break;
230
                                        }
231
                                j = (int)(samples.size() * 0.1) / 2;
232
                                break;
233
                }
234

    
235
                r_rx_list[R_CAPACITY_CAPPROBE] = r_tx_list[R_CAPACITY_CAPPROBE] =  (max_size + param_values[P_CAPPROBE_HEADER_SIZE]) * 8.0 / samples[j];
236

    
237
                char dbg[512];
238
                snprintf(dbg, sizeof(dbg), "Ts: %f Size: %f Cap: %f", r[R_SEND_TIME], max_size, r_rx_list[R_CAPACITY_CAPPROBE]);
239
                debugOutput(dbg);
240
                for(j=0; j < samples.size(); j++) {
241
                        snprintf(dbg, sizeof(dbg), "Ipg: %f", samples[j]);
242
                        debugOutput(dbg);
243
                }
244

    
245
                samples.clear();
246
                min_delay = NAN;
247
                min_ipd = NAN;
248
                max_ipd = NAN;
249
                max_size = NAN;
250
                return r_rx_list[R_CAPACITY_CAPPROBE];
251
        }
252
        return NAN;
253
}
254

    
255
CapprobeMeasurePlugin::CapprobeMeasurePlugin() {
256
        /* Initialise properties: MANDATORY! */
257
        name = "Capacity (capprobe)";
258
        desc = "The capacity in kbit/s computed using capprobe";
259
        id = CAPACITY_CAPPROBE;
260
        /* end of mandatory properties */
261
        addParameter(new MinParameter("Rate","The rate at which we probe the path in kbit/s",0,100), P_CAPPROBE_RATE);
262
        addParameter(new MinParameter("Packet Threshold","The minimum number of packets to process before considering the value somehow attendible [#]",0,CAPPROBE_SAMPLE_WINDOW), P_CAPPROBE_PKT_TH);
263
        addParameter(new MinParameter("Delay Threshold","The tolerance which is added to the minimum delay [us]",-1,100), P_CAPPROBE_DELAY_TH);
264
        addParameter(new MinParameter("Inter-pair delay threshold","The tolerance to the inter-pair delay [%]",0,60), P_CAPPROBE_IPD_TH);
265
        addParameter(new MinParameter("Header size","The size of the lower layer headers [bytes]",0, LOWER_LAYER_OVERHEAD), P_CAPPROBE_HEADER_SIZE);
266
        addParameter(new MinParameter("Payload","The size of the payload for injected packets [bytes]",sizeof(uint32_t), CAPPROBE_MTU), P_CAPPROBE_PAYLOAD_SIZE);
267
        addParameter(new MinMaxParameter("Filter","The filter algorithm used: 0 - PDF 1 - Median",0,1,0), P_CAPPROBE_FILTER);
268
        addParameter(new MinParameter("Intervals","The number of intervals for the PDF [#]",0, 1000), P_CAPPROBE_NUM_INTERVALS);
269
        //TODO addDependency(CORRECTED_DELAY);
270
}