napabaselibs / monl / plugins / capprobe_measure.cpp @ 5f3adef4
History  View  Annotate  Download (8.26 KB)
1 
/***************************************************************************


2 
* Copyright (C) 2009 by Robert Birke

3 
* robert.birke@polito.it

4 
*

5 
* This library is free software; you can redistribute it and/or

6 
* modify it under the terms of the GNU Lesser General Public

7 
* License as published by the Free Software Foundation; either

8 
* version 2.1 of the License, or (at your option) any later version.

9 

10 
* This library is distributed in the hope that it will be useful,

11 
* but WITHOUT ANY WARRANTY; without even the implied warranty of

12 
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU

13 
* Lesser General Public License for more details.

14 

15 
* You should have received a copy of the GNU Lesser General Public

16 
* License along with this library; if not, write to the Free Software

17 
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 021101301 USA

18 
***************************************************************************/

19  
20 
#include "capprobe_measure.h" 
21 
#include "grapes_log.h" 
22 
#include <sys/time.h> 
23 
#include <math.h> 
24 
#include <string> 
25  
26 
//defualt values only! can be changed using the changeParam() function

27 
#define LOWER_LAYER_OVERHEAD 8+20+18+8+12 
28 
#define CAPPROBE_MTU 1300 
29 
#define CAPPROBE_SAMPLE_WINDOW 100 
30  
31 
bool double_sort_function (double i,double j) { return (i<j); } 
32  
33  
34 
CapprobeMeasure::CapprobeMeasure(class MeasurePlugin *m, MeasurementCapabilities mc, class MeasureDispatcher *md): MonMeasure(m,mc,md) { 
35 
pkt = NULL;

36 
max_size = 0;

37 
} 
38  
39 
CapprobeMeasure::~CapprobeMeasure() { 
40 
if(pkt != NULL) 
41 
delete[] pkt;

42 
} 
43  
44 
void CapprobeMeasure::init() {

45 
samples.clear(); 
46 
samples.reserve((int) param_values[P_CAPPROBE_PKT_TH]);

47  
48 
if(pkt != NULL) 
49 
delete[] pkt;

50 
pkt = new char[(int) param_values[P_CAPPROBE_PAYLOAD_SIZE]]; 
51  
52 
avg_pause = param_values[P_CAPPROBE_PAYLOAD_SIZE] / param_values[P_CAPPROBE_RATE] * 8000.0;// [us] 
53 
pair_num = 0;

54 
min_delay = NAN; 
55 
min_ipd = NAN; 
56 
last_rx = NAN; 
57 
last_rx_pn = NAN; 
58 
max_ipd = NAN; 
59 
last_size = NAN; 
60 
} 
61  
62 
void CapprobeMeasure::stop() {

63 
r_rx_list[R_CAPACITY_CAPPROBE] = r_tx_list[R_CAPACITY_CAPPROBE] = NAN; 
64 
if(pkt != NULL) 
65 
delete[] pkt;

66 
pkt = NULL;

67 
} 
68  
69  
70 
int CapprobeMeasure::paramChange(MonParameterType ph, MonParameterValue p){

71 
switch(ph) {

72 
case P_CAPPROBE_RATE:

73 
param_values[P_CAPPROBE_RATE] = p; 
74 
avg_pause = param_values[P_CAPPROBE_PAYLOAD_SIZE] / param_values[P_CAPPROBE_RATE] * 8000.0; 
75 
break;

76 
case P_CAPPROBE_PAYLOAD_SIZE:

77 
param_values[P_CAPPROBE_PAYLOAD_SIZE] = p; 
78 
if(pkt != NULL) 
79 
delete[] pkt;

80 
pkt = new char[(int) param_values[P_CAPPROBE_PAYLOAD_SIZE]]; 
81 
avg_pause = param_values[P_CAPPROBE_PAYLOAD_SIZE] / param_values[P_CAPPROBE_RATE] * 8000.0; 
82 
break;

83 
case P_CAPPROBE_PKT_TH:

84 
param_values[P_CAPPROBE_PKT_TH] = p; 
85 
samples.clear(); 
86 
samples.reserve((int) param_values[P_CAPPROBE_PKT_TH]);

87 
break;

88 
case P_CAPPROBE_DELAY_TH:

89 
param_values[P_CAPPROBE_DELAY_TH] = p; 
90 
break;

91 
case P_CAPPROBE_IPD_TH:

92 
param_values[P_CAPPROBE_IPD_TH] = p; 
93 
break;

94 
case P_CAPPROBE_HEADER_SIZE:

95 
param_values[P_CAPPROBE_HEADER_SIZE] = p; 
96 
break;

97 
} 
98 
return EOK;

99 
} 
100  
101 
double CapprobeMeasure::gen_pause_exp() {

102 
double p = random();

103 
/*turn this into a random number between 0 and 1...*/

104 
double f = p / (double) INT_MAX; 
105 
return 1 * avg_pause * log(1f); 
106 
} 
107  
108 
void CapprobeMeasure::Run() {

109 
double pause;

110 
uint32_t *pn = (uint32_t*) pkt; 
111  
112 
if(flags & REMOTE)

113 
return;

114  
115 
*pn = pair_num++; 
116 
sendOobData(pkt, CAPPROBE_MTU); 
117 
sendOobData(pkt, CAPPROBE_MTU); 
118  
119 
struct timeval next;

120  
121 
pause = gen_pause_exp(); 
122 
next.tv_sec = (unsigned long)pause / 1000000; 
123 
next.tv_usec = (unsigned long)fmod(pause, 100000.0); 
124  
125 
scheduleNextIn(&next); 
126 
} 
127  
128 
void CapprobeMeasure::receiveOobData(char *buf, int buf_len, result *r) { 
129 
uint32_t *pn = (uint32_t*) pkt; 
130  
131 
if(flags & REMOTE) { //remote 
132 
result cap; 
133 
//look for min delay

134 
if(min_delay > r[R_CORRECTED_DELAY]  isnan(min_delay))

135 
min_delay = r[P_CAPPROBE_FILTER]; 
136 

137 
//take only consecutive packets

138 
if(*pn == last_rx_pn) {

139 
cap = computeCapacity(last_rx_delay, r[R_CORRECTED_DELAY], r[R_RECEIVE_TIME]  last_rx, r[R_SIZE], r); 
140 
if(!isnan(cap))

141 
sendOobData((char *) &cap, sizeof(cap)); 
142 
} 
143  
144 
last_rx = r[R_RECEIVE_TIME]; 
145 
last_rx_delay = r[R_CORRECTED_DELAY]; 
146 
last_rx_pn = *pn; 
147  
148 
} else {

149 
result *cap = (result *) pkt; 
150 
newSample(*cap); 
151 
} 
152 
} 
153  
154 
result CapprobeMeasure::RxPkt(result *r,ExecutionList *el) { 
155 
result cap = NAN; 
156  
157 
if(!(flags & REMOTE)) //local: nothing to do 
158 
return NAN;

159  
160 
//look for min delay

161 
if(min_delay > r[R_CORRECTED_DELAY]  isnan(min_delay))

162 
min_delay = r[P_CAPPROBE_FILTER]; 
163  
164 
//take only consecutive packets

165 
if(r[R_DATA_ID] == last_rx_pn)

166 
// && next_offset == r[R_DATA_OFFSET]) TODO: should also check in order

167 
cap = computeCapacity(last_rx_delay, r[R_CORRECTED_DELAY], r[R_RECEIVE_TIME]  last_rx, last_size, r); 
168  
169 
last_rx = r[R_RECEIVE_TIME]; 
170 
last_rx_delay = r[R_CORRECTED_DELAY]; 
171 
last_rx_pn = r[R_DATA_ID]; 
172 
last_size = r[R_SIZE]; 
173 

174 
return cap;

175 
} 
176  
177  
178 
result CapprobeMeasure::computeCapacity(result delay1, result delay2, result ipg, result size, result *r) { 
179  
180 
//filter on min_delay

181 
if((delay1 < min_delay + param_values[P_CAPPROBE_DELAY_TH] / 1000000.0 && 
182 
delay2 < min_delay + param_values[P_CAPPROBE_DELAY_TH] / 1000000.0)  
183 
param_values[P_CAPPROBE_DELAY_TH] < 0)

184 
{ 
185 
if(!isnan(ipg)) {

186 
samples.push_back(ipg); 
187 
if(ipg < min_ipd  isnan(min_ipd))

188 
min_ipd = ipg; 
189 
if(ipg > max_ipd  isnan(max_ipd))

190 
max_ipd = ipg; 
191 
} 
192 
} 
193  
194 
if(samples.size() >= param_values[P_CAPPROBE_PKT_TH]) {// when collected enough data try to make an estimate 
195 
int j,i,c, max_c, i_max;

196 
result threshold, interval; 
197  
198 
if(max_size < size  isnan(max_size))

199 
max_size = size; 
200  
201 
std::sort(samples.begin(), samples.end(), double_sort_function); 
202 

203 
switch((int) param_values[P_CAPPROBE_FILTER]) { 
204 
case 0: //pdf based 
205 
c = 0; max_c = 1; i_max = 0; 
206 
interval = (max_ipd  min_ipd) / param_values[P_CAPPROBE_NUM_INTERVALS]; 
207 
threshold = min_ipd + interval; 
208 
for(i = 0; i < samples.size(); i++) { 
209 
if(samples[i] >= threshold) {

210 
if(c > max_c) {

211 
max_c = c; 
212 
i_max = i  c; 
213 
} 
214 
while(samples[i] >= threshold)

215 
threshold += interval; 
216 
c = 1;

217 
} else

218 
c++; 
219 
} 
220 
j = (i_max + max_c / 2);

221 
break;

222 
case 1: //median based 
223 
for(j=0; j < samples.size(); j++) { 
224 
if(samples[j] > min_ipd * (1 + param_values[P_CAPPROBE_IPD_TH] / 100.0)) 
225 
break;

226 
} 
227 
j = (int)(samples.size() * 0.1) / 2; 
228 
break;

229 
} 
230  
231 
r_rx_list[R_CAPACITY_CAPPROBE] = r_tx_list[R_CAPACITY_CAPPROBE] = (max_size + param_values[P_CAPPROBE_HEADER_SIZE]) * 8.0 / samples[j]; 
232  
233 
char dbg[512]; 
234 
snprintf(dbg, sizeof(dbg), "Ts: %f Size: %f Cap: %f", r[R_SEND_TIME], max_size, r_rx_list[R_CAPACITY_CAPPROBE]); 
235 
debugOutput(dbg); 
236 
for(j=0; j < samples.size(); j++) { 
237 
snprintf(dbg, sizeof(dbg), "Ipg: %f", samples[j]); 
238 
debugOutput(dbg); 
239 
} 
240  
241 
samples.clear(); 
242 
min_delay = NAN; 
243 
min_ipd = NAN; 
244 
max_ipd = NAN; 
245 
max_size = NAN; 
246 
return r_rx_list[R_CAPACITY_CAPPROBE];

247 
} 
248 
return NAN;

249 
} 
250  
251 
CapprobeMeasurePlugin::CapprobeMeasurePlugin() { 
252 
/* Initialise properties: MANDATORY! */

253 
name = "Capacity (capprobe)";

254 
desc = "The capacity in kbit/s computed using capprobe";

255 
id = CAPACITY_CAPPROBE; 
256 
/* end of mandatory properties */

257 
addParameter(new MinParameter("Rate","The rate at which we probe the path in kbit/s",0,100), P_CAPPROBE_RATE); 
258 
addParameter(new MinParameter("Packet Threshold","The minimum number of packets to process before considering the value somehow attendible [#]",0,CAPPROBE_SAMPLE_WINDOW), P_CAPPROBE_PKT_TH); 
259 
addParameter(new MinParameter("Delay Threshold","The tolerance which is added to the minimum delay [us]",1,100), P_CAPPROBE_DELAY_TH); 
260 
addParameter(new MinParameter("Interpair delay threshold","The tolerance to the interpair delay [%]",0,60), P_CAPPROBE_IPD_TH); 
261 
addParameter(new MinParameter("Header size","The size of the lower layer headers [bytes]",0, LOWER_LAYER_OVERHEAD), P_CAPPROBE_HEADER_SIZE); 
262 
addParameter(new MinParameter("Payload","The size of the payload for injected packets [bytes]",sizeof(uint32_t), CAPPROBE_MTU), P_CAPPROBE_PAYLOAD_SIZE); 
263 
addParameter(new MinMaxParameter("Filter","The filter algorithm used: 0  PDF 1  Median",0,1,0), P_CAPPROBE_FILTER); 
264 
addParameter(new MinParameter("Intervals","The number of intervals for the PDF [#]",0, 1000), P_CAPPROBE_NUM_INTERVALS); 
265 
//TODO addDependency(CORRECTED_DELAY);

266 
} 