napabaselibs / monl / result_buffer.cpp @ 49a081ee
History  View  Annotate  Download (6.71 KB)
1 
/***************************************************************************


2 
* Copyright (C) 2009 by Robert Birke

3 
* robert.birke@polito.it

4 
*

5 
* This library is free software; you can redistribute it and/or

6 
* modify it under the terms of the GNU Lesser General Public

7 
* License as published by the Free Software Foundation; either

8 
* version 2.1 of the License, or (at your option) any later version.

9 

10 
* This library is distributed in the hope that it will be useful,

11 
* but WITHOUT ANY WARRANTY; without even the implied warranty of

12 
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU

13 
* Lesser General Public License for more details.

14 

15 
* You should have received a copy of the GNU Lesser General Public

16 
* License along with this library; if not, write to the Free Software

17 
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 021101301 USA

18 
***********************************************************************/

19  
20 
#include "result_buffer.h" 
21 
#include "errors.h" 
22 
#include "napa_log.h" 
23 
#include "repoclient.h" 
24  
25 
#include "mon_event.h" 
26  
27 
#ifndef _WIN32

28 
#include <arpa/inet.h> 
29 
#endif

30  
31 
#include <sys/time.h> 
32 
#include <stdlib.h> 
33 
#include <math.h> 
34  
35 
//TODO:

36 
// check if pubblishing was succesfull

37  
38  
39 
const char* ResultBuffer::stat_suffixes[] = { 
40 
"_last",

41 
"_avg",

42 
"_win_avg",

43 
"_var",

44 
"_win_var",

45 
"_min",

46 
"_win_min",

47 
"_max",

48 
"_win_max",

49 
"_sum",

50 
"_win_sum",

51 
"_rate",

52 
"_period_sum"

53 
}; 
54  
55 
int ResultBuffer::publishResults(void){ 
56 
MeasurementRecord mr; 
57 
SocketId sid; 
58 
char sidA_string[SOCKETID_STRING_SIZE] = ""; 
59 
char sidB_string[SOCKETID_STRING_SIZE] = ""; 
60 
int errorstatus;

61 
result ts; 
62  
63 
if(m>param_values[P_PUBLISHING_PKT_TIME_BASED] == 1.0) {//time based 
64 
struct timeval tv;

65 
float delta = m>param_values[P_PUBLISHING_RATE] * m>param_values[P_PUBLISHING_TIME_SPREAD] / 100.0; 
66 
tv.tv_sec = m>param_values[P_PUBLISHING_RATE]  delta + 2.0 * delta * (float) rand() / (float) RAND_MAX; 
67 
tv.tv_usec = 0;

68 
schedule_publish(&tv, m); 
69 
} 
70  
71 
if(m>status != RUNNING)

72 
return EOK;

73  
74 
if(!new_data && m>param_values[P_PUBLISHING_NEW_ALL] == 0.0) 
75 
return EOK;

76 
new_data = false;

77  
78 
updateStats(); 
79  
80 
info("MONL: publishResults called: %s last value: %f (%s)", publish_name.c_str(), stats[LAST], default_name);

81 
if(publish_length == 0) 
82 
return EOK;

83 
/* Get local ID */

84 
sid = mlGetLocalSocketID(&errorstatus); 
85 
if(errorstatus != 0) 
86 
return EFAILED;

87 
if(mlSocketIDToString(sid, sidA_string, sizeof(sidA_string)) != 0) 
88 
return EFAILED;

89 
if(m>dst_socketid != NULL && m>dst_socketid_publish) { 
90 
if(mlSocketIDToString((SocketId) m>dst_socketid, sidB_string, sizeof(sidB_string)) != 0) 
91 
return EFAILED;

92 
} 
93 
if(*originator_name == NULL) 
94 
mr.originator = sidA_string; 
95 
else

96 
mr.originator = *originator_name; 
97 
mr.targetA = sidA_string; 
98 
mr.targetB = sidB_string; 
99 
mr.string_value = NULL;

100 
mr.channel = cnl.c_str(); 
101 
gettimeofday(&mr.timestamp, NULL);

102 

103 
/* rate */

104 
ts = mr.timestamp.tv_usec / 1000000.0; 
105 
ts += mr.timestamp.tv_sec; 
106 
if(last_publish != 0.0) { 
107 
stats[RATE] = rate_sum_samples / (ts  last_publish); 
108 
} 
109 
last_publish = ts; 
110 
rate_sum_samples = 0.0; 
111 

112 
for(int i = 0; i < publish_length; i++) { 
113 
mr.value = stats[publish[i]]; 
114 
if(isnan(mr.value))

115 
continue;

116 
publish_name += stat_suffixes[publish[i]]; 
117 
mr.published_name = publish_name.c_str(); 
118 
debug("MONL: publishResults called: %s value: %f", mr.published_name, mr.value);

119 
fprintf(stderr,"abouttopublish,%s,%s,%s,%s,%f,%s,%s,%.3f\n",

120 
mr.originator, mr.targetA, mr.targetB, mr.published_name, mr.value, 
121 
mr.string_value, mr.channel, ts); 
122 
if (repo_client) repPublish(repo_client, NULL, NULL, &mr); 
123 
publish_name.erase(name_size); 
124 
} 
125  
126 
stats[PERIOD_SUM] = NAN; 
127  
128 
return EOK;

129 
}; 
130  
131 
int ResultBuffer::init() {

132 
int i;

133 
n_samples = 0; samples = 0; 
134 
sum_win_samples = 0; rate_sum_samples = 0; 
135 
pos = 0;

136 
sum_win_var_samples = 0;

137 
for(i = 0; i < LAST_STAT_TYPE; i++) 
138 
stats[i] = (m>param_values[P_INIT_NAN_ZERO] == 0.0 ? NAN : 0); 
139 
for(i = 0; i < size; i++) 
140 
circular_buffer[i] = NAN; 
141 
return EOK;

142 
} 
143  
144 
int ResultBuffer::resizeBuffer(int s) { 
145 
result *old_buffer = circular_buffer; 
146 
result *old_buffer_var = circular_buffer_var; 
147 
int i,j, n_sam, old_pos, old_size;

148 

149 
if(s == size)

150 
return EOK;

151  
152  
153 
circular_buffer = new result[s];

154 
circular_buffer_var = new result[s];

155  
156 
n_sam = fmin(s, fmin(size, n_samples)); 
157 
old_pos = pos; 
158 
old_size = size; 
159  
160 
sum_win_var_samples = 0;

161 
n_samples = 0; pos = 0; size = s; 
162 
stats[WIN_SUM] = 0;

163  
164 
for(i = 0; i < n_sam; i++) { 
165 
j = old_pos  n_sam + i; 
166 
if(j < 0) 
167 
j = j + old_size; 
168 
newSampleWin(old_buffer[j]); 
169 
} 
170  
171 

172 
delete[] old_buffer;

173 
delete[] old_buffer_var;

174 
return EOK;

175 
}; 
176  
177 
/* This function is used to update internal variables with data from a new sample */

178 
int ResultBuffer::newSample(result r) {

179 
samples++; 
180  
181 
stats[LAST] = r; 
182  
183 
//TODO add statistical data like avg

184  
185 
if(isnan(stats[SUM]))

186 
stats[SUM] = r; 
187 
else

188 
stats[SUM] += r; 
189  
190 
if(isnan(stats[PERIOD_SUM]))

191 
stats[PERIOD_SUM] = r; 
192 
else

193 
stats[PERIOD_SUM] += r; 
194  
195 
rate_sum_samples += r; 
196  
197 
/* Average */

198 
if(isnan(stats[AVG]))

199 
stats[AVG] = 0;

200 
stats[AVG] = ((samples  1) * stats[AVG] + r) / samples;

201  
202 
/* Variance */

203 
if(samples < 2) 
204 
stats[VAR] = 0;

205 
else

206 
stats[VAR] = stats[VAR] * (samples  2)/(samples  1) + samples / pow(samples  1, 2) * pow(r  stats[AVG],2); 
207  
208 
/* Minimum maximum */

209 
if(r < stats[MIN]  isnan(stats[MIN]))

210 
stats[MIN] = r; 
211  
212 
if(r > stats[MAX]  isnan(stats[MAX]))

213 
stats[MAX] = r; 
214  
215 
newSampleWin(r); 
216  
217 
new_data = true;

218  
219 
/* is it time to publish ?*/

220 
if(m>param_values[P_PUBLISHING_PKT_TIME_BASED] == 0.0 && samples % (int)m>param_values[P_PUBLISHING_RATE] == 0) { 
221 
return publishResults();

222 
} 
223  
224 
return EOK;

225 
}; 
226  
227 
int ResultBuffer::newSampleWin(result r) {

228  
229 
/* sum */

230 
if(n_samples < size) {

231 
n_samples++; 
232 
} else {

233 
stats[WIN_SUM] = circular_buffer[pos]; 
234 
} 
235  
236 
if(isnan(stats[WIN_SUM]))

237 
stats[WIN_SUM] = r; 
238 
else

239 
stats[WIN_SUM] += r; 
240  
241 
circular_buffer[pos] = r; 
242  
243 
pos++; 
244 
if(pos >= size)

245 
pos = size; 
246  
247 
return EOK;

248 
}; 
249  
250  
251 
/*This function updates the stats vector */

252 
void ResultBuffer::updateStats(void) { 
253 
int i,j;

254  
255 
/* Note: some stats are already updated on the fly in newSample() */

256 
/* This function is to update the more time consuming not recursive statitistics */

257  
258 
stats[WIN_AVG] = stats[WIN_SUM]/n_samples; 
259  
260 
/* Minimum, maximum and variance*/

261 
stats[WIN_MIN] = stats[WIN_MAX] = stats[LAST]; 
262 
stats[WIN_VAR] = 0.0; 
263 
for(i = 1; i <= n_samples; i++) { 
264 
j = pos  i; // pos already incremented and last sample is starting one

265 
if(j < 0) 
266 
j += size; 
267 
if(circular_buffer[j] < stats[WIN_MIN])

268 
stats[WIN_MIN] = circular_buffer[j]; 
269 
if(circular_buffer[j] > stats[WIN_MAX])

270 
stats[WIN_MAX] = circular_buffer[j]; 
271 
stats[WIN_VAR] += pow(stats[WIN_AVG]  circular_buffer[j], 2);

272 
} 
273 
if(n_samples > 1) 
274 
stats[WIN_VAR] = stats[WIN_VAR] / (n_samples  1);

275 
else

276 
stats[WIN_VAR] = NAN; 
277 
} 
278 