Statistics
| Branch: | Revision:

napa-baselibs / monl / plugins / capprobe_measure.cpp @ 5f3adef4

History | View | Annotate | Download (8.26 KB)

1
/***************************************************************************
2
 *   Copyright (C) 2009 by Robert Birke
3
 *   robert.birke@polito.it
4
 *
5
 * This library is free software; you can redistribute it and/or
6
 * modify it under the terms of the GNU Lesser General Public
7
 * License as published by the Free Software Foundation; either
8
 * version 2.1 of the License, or (at your option) any later version.
9

10
 * This library is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13
 * Lesser General Public License for more details.
14

15
 * You should have received a copy of the GNU Lesser General Public
16
 * License along with this library; if not, write to the Free Software
17
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA 
18
 ***************************************************************************/
19

    
20
#include "capprobe_measure.h"
21
#include "grapes_log.h"
22
#include <sys/time.h>
23
#include <math.h>
24
#include <string>
25

    
26
//defualt values only! can be changed using the changeParam() function
27
#define LOWER_LAYER_OVERHEAD 8+20+18+8+12
28
#define CAPPROBE_MTU 1300
29
#define CAPPROBE_SAMPLE_WINDOW 100
30

    
31
bool double_sort_function (double i,double j) { return (i<j); }
32

    
33

    
34
CapprobeMeasure::CapprobeMeasure(class MeasurePlugin *m, MeasurementCapabilities mc, class MeasureDispatcher *md): MonMeasure(m,mc,md) {
35
        pkt = NULL;
36
        max_size = 0;
37
}
38

    
39
CapprobeMeasure::~CapprobeMeasure() {
40
        if(pkt != NULL)
41
                delete[] pkt;
42
}
43

    
44
void CapprobeMeasure::init() {
45
        samples.clear();
46
        samples.reserve((int) param_values[P_CAPPROBE_PKT_TH]);
47

    
48
        if(pkt != NULL)
49
                delete[] pkt;
50
        pkt = new char[(int) param_values[P_CAPPROBE_PAYLOAD_SIZE]];
51

    
52
        avg_pause = param_values[P_CAPPROBE_PAYLOAD_SIZE] / param_values[P_CAPPROBE_RATE] * 8000.0;// [us]
53
        pair_num = 0;
54
        min_delay = NAN;
55
        min_ipd = NAN;
56
        last_rx = NAN;
57
        last_rx_pn = NAN;
58
        max_ipd = NAN;
59
        last_size = NAN;
60
}
61

    
62
void CapprobeMeasure::stop() {
63
        r_rx_list[R_CAPACITY_CAPPROBE] = r_tx_list[R_CAPACITY_CAPPROBE] = NAN;
64
        if(pkt != NULL)
65
                delete[] pkt;
66
        pkt = NULL;
67
}
68

    
69

    
70
int CapprobeMeasure::paramChange(MonParameterType ph, MonParameterValue p){
71
        switch(ph) {
72
                case P_CAPPROBE_RATE:
73
                        param_values[P_CAPPROBE_RATE] = p;
74
                        avg_pause = param_values[P_CAPPROBE_PAYLOAD_SIZE] / param_values[P_CAPPROBE_RATE] * 8000.0;
75
                        break;
76
                case P_CAPPROBE_PAYLOAD_SIZE:
77
                        param_values[P_CAPPROBE_PAYLOAD_SIZE] = p;
78
                        if(pkt != NULL)
79
                                delete[] pkt;
80
                        pkt = new char[(int) param_values[P_CAPPROBE_PAYLOAD_SIZE]];
81
                        avg_pause = param_values[P_CAPPROBE_PAYLOAD_SIZE] / param_values[P_CAPPROBE_RATE] * 8000.0;
82
                        break;
83
                case P_CAPPROBE_PKT_TH:
84
                        param_values[P_CAPPROBE_PKT_TH] = p;
85
                        samples.clear();
86
                        samples.reserve((int) param_values[P_CAPPROBE_PKT_TH]);
87
                        break;
88
                case P_CAPPROBE_DELAY_TH:
89
                        param_values[P_CAPPROBE_DELAY_TH] = p;
90
                        break;
91
                case P_CAPPROBE_IPD_TH:
92
                        param_values[P_CAPPROBE_IPD_TH] = p;
93
                        break;
94
                case P_CAPPROBE_HEADER_SIZE:
95
                        param_values[P_CAPPROBE_HEADER_SIZE] = p;
96
                        break;
97
        }
98
        return EOK;
99
}
100

    
101
double CapprobeMeasure::gen_pause_exp() {
102
        double p = random();
103
        /*turn this into a random number between 0 and 1...*/
104
        double f = p / (double) INT_MAX;
105
        return -1 * avg_pause * log(1-f);
106
}
107

    
108
void CapprobeMeasure::Run() {
109
        double pause;
110
        uint32_t *pn = (uint32_t*) pkt;
111

    
112
        if(flags & REMOTE)
113
                return;
114

    
115
        *pn = pair_num++;
116
        sendOobData(pkt, CAPPROBE_MTU);
117
        sendOobData(pkt, CAPPROBE_MTU);
118

    
119
        struct timeval next;
120

    
121
        pause = gen_pause_exp();
122
        next.tv_sec = (unsigned long)pause / 1000000;
123
        next.tv_usec = (unsigned long)fmod(pause, 100000.0);
124

    
125
        scheduleNextIn(&next);
126
}
127

    
128
void CapprobeMeasure::receiveOobData(char *buf, int buf_len, result *r) {
129
        uint32_t *pn = (uint32_t*) pkt;
130

    
131
        if(flags & REMOTE) { //remote
132
                result cap;
133
                //look for min delay
134
                if(min_delay > r[R_CORRECTED_DELAY] || isnan(min_delay))
135
                        min_delay = r[P_CAPPROBE_FILTER];
136
        
137
                //take only consecutive packets
138
                if(*pn == last_rx_pn) {
139
                        cap = computeCapacity(last_rx_delay, r[R_CORRECTED_DELAY], r[R_RECEIVE_TIME] - last_rx, r[R_SIZE], r);
140
                        if(!isnan(cap))
141
                                sendOobData((char *) &cap, sizeof(cap));
142
                }
143

    
144
                last_rx = r[R_RECEIVE_TIME];
145
                last_rx_delay = r[R_CORRECTED_DELAY];
146
                last_rx_pn = *pn;
147

    
148
        } else  {
149
                result *cap = (result *) pkt;
150
                newSample(*cap);
151
        }
152
}
153

    
154
result CapprobeMeasure::RxPkt(result *r,ExecutionList *el) {
155
        result cap = NAN;
156

    
157
        if(!(flags & REMOTE)) //local: nothing to do
158
                return NAN;
159

    
160
        //look for min delay
161
        if(min_delay > r[R_CORRECTED_DELAY] || isnan(min_delay))
162
                min_delay = r[P_CAPPROBE_FILTER];
163

    
164
        //take only consecutive packets
165
        if(r[R_DATA_ID] == last_rx_pn)
166
        // &&  next_offset == r[R_DATA_OFFSET]) TODO: should also check in order
167
                cap = computeCapacity(last_rx_delay, r[R_CORRECTED_DELAY], r[R_RECEIVE_TIME] - last_rx, last_size, r);
168

    
169
        last_rx = r[R_RECEIVE_TIME];
170
        last_rx_delay = r[R_CORRECTED_DELAY];
171
        last_rx_pn = r[R_DATA_ID];
172
        last_size = r[R_SIZE];
173
        
174
        return cap;
175
}
176

    
177

    
178
result CapprobeMeasure::computeCapacity(result delay1, result delay2, result ipg, result size, result *r) {
179

    
180
        //filter on min_delay
181
        if((delay1 < min_delay + param_values[P_CAPPROBE_DELAY_TH] / 1000000.0 &&
182
                delay2 < min_delay + param_values[P_CAPPROBE_DELAY_TH] / 1000000.0) ||
183
                param_values[P_CAPPROBE_DELAY_TH] < 0)
184
        {
185
                if(!isnan(ipg)) {
186
                                samples.push_back(ipg);
187
                                if(ipg < min_ipd || isnan(min_ipd))
188
                                        min_ipd = ipg;
189
                                if(ipg > max_ipd || isnan(max_ipd))
190
                                        max_ipd = ipg;
191
                }
192
        }
193

    
194
        if(samples.size() >= param_values[P_CAPPROBE_PKT_TH]) {// when collected enough data try to make an estimate
195
                int j,i,c, max_c, i_max;
196
                result threshold, interval;
197

    
198
                if(max_size < size || isnan(max_size))
199
                        max_size = size;
200

    
201
                std::sort(samples.begin(), samples.end(), double_sort_function);
202
                
203
                switch((int) param_values[P_CAPPROBE_FILTER]) {
204
                case 0: //pdf based
205
                                c = 0; max_c = -1; i_max = 0;
206
                                interval = (max_ipd - min_ipd) / param_values[P_CAPPROBE_NUM_INTERVALS];
207
                                threshold = min_ipd + interval;
208
                                for(i = 0; i < samples.size(); i++) {
209
                                        if(samples[i] >= threshold) {
210
                                                if(c > max_c) {
211
                                                        max_c = c;
212
                                                        i_max = i - c;
213
                                                }
214
                                                while(samples[i] >= threshold)
215
                                                        threshold += interval;
216
                                                c = 1;
217
                                        } else
218
                                                c++;
219
                                }
220
                                j = (i_max + max_c / 2);
221
                                break;
222
                case 1: //median based
223
                                for(j=0; j < samples.size(); j++) {
224
                                                if(samples[j] > min_ipd * (1 + param_values[P_CAPPROBE_IPD_TH] / 100.0))
225
                                                        break;
226
                                        }
227
                                j = (int)(samples.size() * 0.1) / 2;
228
                                break;
229
                }
230

    
231
                r_rx_list[R_CAPACITY_CAPPROBE] = r_tx_list[R_CAPACITY_CAPPROBE] =  (max_size + param_values[P_CAPPROBE_HEADER_SIZE]) * 8.0 / samples[j];
232

    
233
                char dbg[512];
234
                snprintf(dbg, sizeof(dbg), "Ts: %f Size: %f Cap: %f", r[R_SEND_TIME], max_size, r_rx_list[R_CAPACITY_CAPPROBE]);
235
                debugOutput(dbg);
236
                for(j=0; j < samples.size(); j++) {
237
                        snprintf(dbg, sizeof(dbg), "Ipg: %f", samples[j]);
238
                        debugOutput(dbg);
239
                }
240

    
241
                samples.clear();
242
                min_delay = NAN;
243
                min_ipd = NAN;
244
                max_ipd = NAN;
245
                max_size = NAN;
246
                return r_rx_list[R_CAPACITY_CAPPROBE];
247
        }
248
        return NAN;
249
}
250

    
251
CapprobeMeasurePlugin::CapprobeMeasurePlugin() {
252
        /* Initialise properties: MANDATORY! */
253
        name = "Capacity (capprobe)";
254
        desc = "The capacity in kbit/s computed using capprobe";
255
        id = CAPACITY_CAPPROBE;
256
        /* end of mandatory properties */
257
        addParameter(new MinParameter("Rate","The rate at which we probe the path in kbit/s",0,100), P_CAPPROBE_RATE);
258
        addParameter(new MinParameter("Packet Threshold","The minimum number of packets to process before considering the value somehow attendible [#]",0,CAPPROBE_SAMPLE_WINDOW), P_CAPPROBE_PKT_TH);
259
        addParameter(new MinParameter("Delay Threshold","The tolerance which is added to the minimum delay [us]",-1,100), P_CAPPROBE_DELAY_TH);
260
        addParameter(new MinParameter("Inter-pair delay threshold","The tolerance to the inter-pair delay [%]",0,60), P_CAPPROBE_IPD_TH);
261
        addParameter(new MinParameter("Header size","The size of the lower layer headers [bytes]",0, LOWER_LAYER_OVERHEAD), P_CAPPROBE_HEADER_SIZE);
262
        addParameter(new MinParameter("Payload","The size of the payload for injected packets [bytes]",sizeof(uint32_t), CAPPROBE_MTU), P_CAPPROBE_PAYLOAD_SIZE);
263
        addParameter(new MinMaxParameter("Filter","The filter algorithm used: 0 - PDF 1 - Median",0,1,0), P_CAPPROBE_FILTER);
264
        addParameter(new MinParameter("Intervals","The number of intervals for the PDF [#]",0, 1000), P_CAPPROBE_NUM_INTERVALS);
265
        //TODO addDependency(CORRECTED_DELAY);
266
}