napabaselibs / monl / plugins / capprobe_measure.cpp @ 507372bb
History  View  Annotate  Download (8.31 KB)
1 
/***************************************************************************


2 
* Copyright (C) 2009 by Robert Birke

3 
* robert.birke@polito.it

4 
*

5 
* This library is free software; you can redistribute it and/or

6 
* modify it under the terms of the GNU Lesser General Public

7 
* License as published by the Free Software Foundation; either

8 
* version 2.1 of the License, or (at your option) any later version.

9 

10 
* This library is distributed in the hope that it will be useful,

11 
* but WITHOUT ANY WARRANTY; without even the implied warranty of

12 
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU

13 
* Lesser General Public License for more details.

14 

15 
* You should have received a copy of the GNU Lesser General Public

16 
* License along with this library; if not, write to the Free Software

17 
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 021101301 USA

18 
***************************************************************************/

19  
20 
#include "capprobe_measure.h" 
21 
#include "napa_log.h" 
22 
#include <sys/time.h> 
23 
#include <math.h> 
24 
#include <string> 
25  
26 
#ifdef WIN32

27 
#define random(x) rand(x)

28 
#endif

29  
30 
//defualt values only! can be changed using the changeParam() function

31 
#define LOWER_LAYER_OVERHEAD 8+20+18+8+12 
32 
#define CAPPROBE_MTU 1300 
33 
#define CAPPROBE_SAMPLE_WINDOW 100 
34  
35 
bool double_sort_function (double i,double j) { return (i<j); } 
36  
37  
38 
CapprobeMeasure::CapprobeMeasure(class MeasurePlugin *m, MeasurementCapabilities mc, class MeasureDispatcher *md): MonMeasure(m,mc,md) { 
39 
pkt = NULL;

40 
max_size = 0;

41 
} 
42  
43 
CapprobeMeasure::~CapprobeMeasure() { 
44 
if(pkt != NULL) 
45 
delete[] pkt;

46 
} 
47  
48 
void CapprobeMeasure::init() {

49 
samples.clear(); 
50 
samples.reserve((int) param_values[P_CAPPROBE_PKT_TH]);

51  
52 
if(pkt != NULL) 
53 
delete[] pkt;

54 
pkt = new char[(int) param_values[P_CAPPROBE_PAYLOAD_SIZE]]; 
55  
56 
avg_pause = param_values[P_CAPPROBE_PAYLOAD_SIZE] / param_values[P_CAPPROBE_RATE] * 8000.0;// [us] 
57 
pair_num = 0;

58 
min_delay = NAN; 
59 
min_ipd = NAN; 
60 
last_rx = NAN; 
61 
last_rx_pn = NAN; 
62 
max_ipd = NAN; 
63 
last_size = NAN; 
64 
} 
65  
66 
void CapprobeMeasure::stop() {

67 
r_rx_list[R_CAPACITY_CAPPROBE] = r_tx_list[R_CAPACITY_CAPPROBE] = NAN; 
68 
if(pkt != NULL) 
69 
delete[] pkt;

70 
pkt = NULL;

71 
} 
72  
73  
74 
int CapprobeMeasure::paramChange(MonParameterType ph, MonParameterValue p){

75 
switch(ph) {

76 
case P_CAPPROBE_RATE:

77 
param_values[P_CAPPROBE_RATE] = p; 
78 
avg_pause = param_values[P_CAPPROBE_PAYLOAD_SIZE] / param_values[P_CAPPROBE_RATE] * 8000.0; 
79 
break;

80 
case P_CAPPROBE_PAYLOAD_SIZE:

81 
param_values[P_CAPPROBE_PAYLOAD_SIZE] = p; 
82 
if(pkt != NULL) 
83 
delete[] pkt;

84 
pkt = new char[(int) param_values[P_CAPPROBE_PAYLOAD_SIZE]]; 
85 
avg_pause = param_values[P_CAPPROBE_PAYLOAD_SIZE] / param_values[P_CAPPROBE_RATE] * 8000.0; 
86 
break;

87 
case P_CAPPROBE_PKT_TH:

88 
param_values[P_CAPPROBE_PKT_TH] = p; 
89 
samples.clear(); 
90 
samples.reserve((int) param_values[P_CAPPROBE_PKT_TH]);

91 
break;

92 
case P_CAPPROBE_DELAY_TH:

93 
param_values[P_CAPPROBE_DELAY_TH] = p; 
94 
break;

95 
case P_CAPPROBE_IPD_TH:

96 
param_values[P_CAPPROBE_IPD_TH] = p; 
97 
break;

98 
case P_CAPPROBE_HEADER_SIZE:

99 
param_values[P_CAPPROBE_HEADER_SIZE] = p; 
100 
break;

101 
} 
102 
return EOK;

103 
} 
104  
105 
double CapprobeMeasure::gen_pause_exp() {

106 
double p = random();

107 
/*turn this into a random number between 0 and 1...*/

108 
double f = p / (double) INT_MAX; 
109 
return 1 * avg_pause * log(1f); 
110 
} 
111  
112 
void CapprobeMeasure::Run() {

113 
double pause;

114 
uint32_t *pn = (uint32_t*) pkt; 
115  
116 
if(flags & REMOTE)

117 
return;

118  
119 
*pn = pair_num++; 
120 
sendOobData(pkt, CAPPROBE_MTU); 
121 
sendOobData(pkt, CAPPROBE_MTU); 
122  
123 
struct timeval next;

124  
125 
pause = gen_pause_exp(); 
126 
next.tv_sec = (unsigned long)pause / 1000000; 
127 
next.tv_usec = (unsigned long)fmod(pause, 100000.0); 
128  
129 
scheduleNextIn(&next); 
130 
} 
131  
132 
void CapprobeMeasure::receiveOobData(char *buf, int buf_len, result *r) { 
133 
uint32_t *pn = (uint32_t*) pkt; 
134  
135 
if(flags & REMOTE) { //remote 
136 
result cap; 
137 
//look for min delay

138 
if(min_delay > r[R_CORRECTED_DELAY]  isnan(min_delay))

139 
min_delay = r[P_CAPPROBE_FILTER]; 
140 

141 
//take only consecutive packets

142 
if(*pn == last_rx_pn) {

143 
cap = computeCapacity(last_rx_delay, r[R_CORRECTED_DELAY], r[R_RECEIVE_TIME]  last_rx, r[R_SIZE], r); 
144 
if(!isnan(cap))

145 
sendOobData((char *) &cap, sizeof(cap)); 
146 
} 
147  
148 
last_rx = r[R_RECEIVE_TIME]; 
149 
last_rx_delay = r[R_CORRECTED_DELAY]; 
150 
last_rx_pn = *pn; 
151  
152 
} else {

153 
result *cap = (result *) pkt; 
154 
newSample(*cap); 
155 
} 
156 
} 
157  
158 
result CapprobeMeasure::RxPkt(result *r,ExecutionList *el) { 
159 
result cap = NAN; 
160  
161 
if(!(flags & REMOTE)) //local: nothing to do 
162 
return NAN;

163  
164 
//look for min delay

165 
if(min_delay > r[R_CORRECTED_DELAY]  isnan(min_delay))

166 
min_delay = r[P_CAPPROBE_FILTER]; 
167  
168 
//take only consecutive packets

169 
if(r[R_DATA_ID] == last_rx_pn)

170 
// && next_offset == r[R_DATA_OFFSET]) TODO: should also check in order

171 
cap = computeCapacity(last_rx_delay, r[R_CORRECTED_DELAY], r[R_RECEIVE_TIME]  last_rx, last_size, r); 
172  
173 
last_rx = r[R_RECEIVE_TIME]; 
174 
last_rx_delay = r[R_CORRECTED_DELAY]; 
175 
last_rx_pn = r[R_DATA_ID]; 
176 
last_size = r[R_SIZE]; 
177 

178 
return cap;

179 
} 
180  
181  
182 
result CapprobeMeasure::computeCapacity(result delay1, result delay2, result ipg, result size, result *r) { 
183  
184 
//filter on min_delay

185 
if((delay1 < min_delay + param_values[P_CAPPROBE_DELAY_TH] / 1000000.0 && 
186 
delay2 < min_delay + param_values[P_CAPPROBE_DELAY_TH] / 1000000.0)  
187 
param_values[P_CAPPROBE_DELAY_TH] < 0)

188 
{ 
189 
if(!isnan(ipg)) {

190 
samples.push_back(ipg); 
191 
if(ipg < min_ipd  isnan(min_ipd))

192 
min_ipd = ipg; 
193 
if(ipg > max_ipd  isnan(max_ipd))

194 
max_ipd = ipg; 
195 
} 
196 
} 
197  
198 
if(samples.size() >= param_values[P_CAPPROBE_PKT_TH]) {// when collected enough data try to make an estimate 
199 
int j,i,c, max_c, i_max;

200 
result threshold, interval; 
201  
202 
if(max_size < size  isnan(max_size))

203 
max_size = size; 
204  
205 
std::sort(samples.begin(), samples.end(), double_sort_function); 
206 

207 
switch((int) param_values[P_CAPPROBE_FILTER]) { 
208 
case 0: //pdf based 
209 
c = 0; max_c = 1; i_max = 0; 
210 
interval = (max_ipd  min_ipd) / param_values[P_CAPPROBE_NUM_INTERVALS]; 
211 
threshold = min_ipd + interval; 
212 
for(i = 0; i < samples.size(); i++) { 
213 
if(samples[i] >= threshold) {

214 
if(c > max_c) {

215 
max_c = c; 
216 
i_max = i  c; 
217 
} 
218 
while(samples[i] >= threshold)

219 
threshold += interval; 
220 
c = 1;

221 
} else

222 
c++; 
223 
} 
224 
j = (i_max + max_c / 2);

225 
break;

226 
case 1: //median based 
227 
for(j=0; j < samples.size(); j++) { 
228 
if(samples[j] > min_ipd * (1 + param_values[P_CAPPROBE_IPD_TH] / 100.0)) 
229 
break;

230 
} 
231 
j = (int)(samples.size() * 0.1) / 2; 
232 
break;

233 
} 
234  
235 
r_rx_list[R_CAPACITY_CAPPROBE] = r_tx_list[R_CAPACITY_CAPPROBE] = (max_size + param_values[P_CAPPROBE_HEADER_SIZE]) * 8.0 / samples[j]; 
236  
237 
char dbg[512]; 
238 
snprintf(dbg, sizeof(dbg), "Ts: %f Size: %f Cap: %f", r[R_SEND_TIME], max_size, r_rx_list[R_CAPACITY_CAPPROBE]); 
239 
debugOutput(dbg); 
240 
for(j=0; j < samples.size(); j++) { 
241 
snprintf(dbg, sizeof(dbg), "Ipg: %f", samples[j]); 
242 
debugOutput(dbg); 
243 
} 
244  
245 
samples.clear(); 
246 
min_delay = NAN; 
247 
min_ipd = NAN; 
248 
max_ipd = NAN; 
249 
max_size = NAN; 
250 
return r_rx_list[R_CAPACITY_CAPPROBE];

251 
} 
252 
return NAN;

253 
} 
254  
255 
CapprobeMeasurePlugin::CapprobeMeasurePlugin() { 
256 
/* Initialise properties: MANDATORY! */

257 
name = "Capacity (capprobe)";

258 
desc = "The capacity in kbit/s computed using capprobe";

259 
id = CAPACITY_CAPPROBE; 
260 
/* end of mandatory properties */

261 
addParameter(new MinParameter("Rate","The rate at which we probe the path in kbit/s",0,100), P_CAPPROBE_RATE); 
262 
addParameter(new MinParameter("Packet Threshold","The minimum number of packets to process before considering the value somehow attendible [#]",0,CAPPROBE_SAMPLE_WINDOW), P_CAPPROBE_PKT_TH); 
263 
addParameter(new MinParameter("Delay Threshold","The tolerance which is added to the minimum delay [us]",1,100), P_CAPPROBE_DELAY_TH); 
264 
addParameter(new MinParameter("Interpair delay threshold","The tolerance to the interpair delay [%]",0,60), P_CAPPROBE_IPD_TH); 
265 
addParameter(new MinParameter("Header size","The size of the lower layer headers [bytes]",0, LOWER_LAYER_OVERHEAD), P_CAPPROBE_HEADER_SIZE); 
266 
addParameter(new MinParameter("Payload","The size of the payload for injected packets [bytes]",sizeof(uint32_t), CAPPROBE_MTU), P_CAPPROBE_PAYLOAD_SIZE); 
267 
addParameter(new MinMaxParameter("Filter","The filter algorithm used: 0  PDF 1  Median",0,1,0), P_CAPPROBE_FILTER); 
268 
addParameter(new MinParameter("Intervals","The number of intervals for the PDF [#]",0, 1000), P_CAPPROBE_NUM_INTERVALS); 
269 
//TODO addDependency(CORRECTED_DELAY);

270 
} 