Statistics
| Branch: | Revision:

napa-baselibs / monl / result_buffer.cpp @ 507372bb

History | View | Annotate | Download (6.53 KB)

1
/***************************************************************************
2
 *   Copyright (C) 2009 by Robert Birke
3
 *   robert.birke@polito.it
4
 *
5
 * This library is free software; you can redistribute it and/or
6
 * modify it under the terms of the GNU Lesser General Public
7
 * License as published by the Free Software Foundation; either
8
 * version 2.1 of the License, or (at your option) any later version.
9

10
 * This library is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13
 * Lesser General Public License for more details.
14

15
 * You should have received a copy of the GNU Lesser General Public
16
 * License along with this library; if not, write to the Free Software
17
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA 
18
 ***********************************************************************/
19

    
20
#include "result_buffer.h"
21
#include "errors.h"
22
#include        "napa_log.h"
23
#include "repoclient.h"
24

    
25
#include "mon_event.h"
26

    
27
#ifndef WIN32
28
#include <arpa/inet.h>
29
#endif
30

    
31
#include <sys/time.h>
32
#include <stdlib.h>
33
#include <math.h>
34

    
35
//TODO:
36
//- check if pubblishing was succesfull
37

    
38

    
39
const char* ResultBuffer::stat_suffixes[] = {
40
        "_last",
41
        "_avg",
42
        "_win_avg",
43
        "_var",
44
        "_win_var",
45
        "_min",
46
        "_win_min",
47
        "_max",
48
        "_win_max",
49
        "_sum",
50
        "_win_sum",
51
        "_rate",
52
        "_period_sum"
53
};
54

    
55
int ResultBuffer::publishResults(void){
56
        MeasurementRecord mr;
57
        SocketId sid;
58
        char sidA_string[SOCKETID_STRING_SIZE] = "";
59
        char sidB_string[SOCKETID_STRING_SIZE] = "";
60
        int errorstatus;
61
        result ts;
62

    
63
        if(m->param_values[P_PUBLISHING_PKT_TIME_BASED] == 1.0) {//time based
64
                struct timeval tv;
65
                float delta = m->param_values[P_PUBLISHING_RATE] * m->param_values[P_PUBLISHING_TIME_SPREAD] / 100.0; 
66
                tv.tv_sec = m->param_values[P_PUBLISHING_RATE] - delta +  2.0 * delta * (float) rand() / (float) RAND_MAX;
67
                tv.tv_usec = 0;
68
                schedule_publish(&tv, m);
69
        }
70

    
71
        if(m->status != RUNNING)
72
                return EOK;
73

    
74
        if(!new_data && m->param_values[P_PUBLISHING_NEW_ALL] == 0.0)
75
                return EOK;
76
        new_data = false;
77

    
78
        updateStats();
79

    
80
        info("MONL: publishResults called: %s value: %f (%s)", publish_name.c_str(), stats[LAST], default_name);
81
        if(publish_length == 0)
82
                return EOK;
83
        /* Get local ID */
84
        sid = mlGetLocalSocketID(&errorstatus);
85
        if(errorstatus != 0)
86
                return -EFAILED;
87
        if(mlSocketIDToString(sid, sidA_string, sizeof(sidA_string)) != 0)
88
                return -EFAILED;
89
        if(m->dst_socketid != NULL && m->dst_socketid_publish) {
90
                if(mlSocketIDToString((SocketId) m->dst_socketid, sidB_string, sizeof(sidB_string)) != 0)
91
                        return -EFAILED;
92
        }
93
        if(*originator_name == NULL)
94
                mr.originator = sidA_string;
95
        else
96
                mr.originator = *originator_name;
97
        mr.targetA = sidA_string;
98
        mr.targetB = sidB_string;
99
        mr.string_value = NULL;
100
        mr.channel = cnl.c_str();
101
        gettimeofday(&mr.timestamp, NULL);
102
        
103
        /* rate */
104
        ts =  mr.timestamp.tv_usec / 1000000.0;
105
        ts += mr.timestamp.tv_sec;
106
        if(last_publish != 0.0) {
107
                stats[RATE] = rate_sum_samples / (ts - last_publish);
108
        }
109
        last_publish = ts;
110
        rate_sum_samples = 0.0;
111
        
112
        for(int i = 0; i < publish_length; i++) {
113
                mr.value = stats[publish[i]];
114
                if(isnan(mr.value))
115
                        continue;
116
                publish_name += stat_suffixes[publish[i]];
117
                mr.published_name = publish_name.c_str();
118
                debug("MONL: publishResults called: %s value: %f", mr.published_name, mr.value);
119
                repPublish(repo_client, NULL, NULL, &mr);
120
                publish_name.erase(name_size);
121
        }
122

    
123
        stats[PERIOD_SUM] = NAN;
124

    
125
        return EOK;
126
};
127

    
128
int ResultBuffer::init() {
129
        int i;
130
        n_samples = 0; samples = 0;
131
        sum_win_samples = 0; rate_sum_samples = 0;
132
        pos = 0;
133
        sum_win_var_samples = 0;
134
        for(i = 0; i < LAST_STAT_TYPE; i++)
135
                stats[i] = (m->param_values[P_INIT_NAN_ZERO] == 0.0 ? NAN : 0);
136
        for(i = 0; i < size; i++)
137
                circular_buffer[i] = NAN;
138
        return EOK;
139
}
140

    
141
int ResultBuffer::resizeBuffer(int s) {
142
        result *old_buffer = circular_buffer;
143
        result *old_buffer_var = circular_buffer_var;
144
        int i,j, n_sam, old_pos, old_size;
145
        
146
        if(s == size)
147
                return EOK;
148

    
149

    
150
        circular_buffer = new result[s];
151
        circular_buffer_var = new result[s];
152

    
153
        n_sam = fmin(s, fmin(size, n_samples));
154
        old_pos = pos;
155
        old_size = size;
156

    
157
        sum_win_var_samples = 0;
158
        n_samples = 0; pos = 0; size = s;
159
        stats[WIN_SUM] = 0;
160

    
161
        for(i = 0; i < n_sam; i++) {
162
                j = old_pos - n_sam + i;
163
                if(j < 0)
164
                        j = j + old_size;
165
                newSampleWin(old_buffer[j]);
166
        }
167

    
168
        
169
        delete[] old_buffer;
170
        delete[] old_buffer_var;
171
        return EOK;
172
};
173

    
174
/* This function is used to update internal variables with data from a new sample */
175
int ResultBuffer::newSample(result r) {
176
        samples++;
177

    
178
        stats[LAST] = r;
179

    
180
        //TODO add statistical data like avg
181

    
182
        if(isnan(stats[SUM]))
183
                stats[SUM] = r;
184
        else
185
                stats[SUM] += r;
186

    
187
        if(isnan(stats[PERIOD_SUM]))
188
                stats[PERIOD_SUM] = r;
189
        else
190
                stats[PERIOD_SUM] += r;
191

    
192
        rate_sum_samples += r;
193

    
194
        /* Average  */
195
        if(isnan(stats[AVG]))
196
                stats[AVG] = 0;
197
        stats[AVG] = ((samples - 1) * stats[AVG] + r) / samples;
198

    
199
        /* Variance */
200
        if(samples < 2)
201
                stats[VAR] = 0;
202
        else
203
                stats[VAR] = stats[VAR] * (samples - 2)/(samples - 1) + samples / pow(samples - 1, 2) * pow(r - stats[AVG],2);
204

    
205
        /* Minimum maximum */
206
        if(r < stats[MIN] || isnan(stats[MIN]))
207
                stats[MIN] = r;
208

    
209
        if(r > stats[MAX] || isnan(stats[MAX]))
210
                stats[MAX] = r;
211

    
212
        newSampleWin(r);
213

    
214
        new_data = true;
215

    
216
        /* is it time to publish ?*/
217
        if(m->param_values[P_PUBLISHING_PKT_TIME_BASED] == 0.0 && samples % (int)m->param_values[P_PUBLISHING_RATE] == 0) {
218
                return publishResults();
219
        }
220

    
221
        return EOK;
222
};
223

    
224
int ResultBuffer::newSampleWin(result r) {
225

    
226
        /* sum */
227
        if(n_samples < size)        {
228
                n_samples++;
229
        } else {
230
                stats[WIN_SUM] -= circular_buffer[pos];
231
        }
232

    
233
        if(isnan(stats[WIN_SUM]))
234
                stats[WIN_SUM] = r;
235
        else
236
                stats[WIN_SUM] += r;
237

    
238
        circular_buffer[pos] = r;
239

    
240
        pos++;
241
        if(pos >= size)
242
                pos -= size;
243

    
244
        return EOK;
245
};
246

    
247

    
248
/*This function updates the stats vector */
249
void ResultBuffer::updateStats(void) {
250
        int i,j;
251

    
252
        /* Note: some stats are already updated on the fly in newSample() */
253
        /* This function is to update the more time consuming not recursive statitistics */
254

    
255
        stats[WIN_AVG] = stats[WIN_SUM]/n_samples;
256

    
257
        /* Minimum, maximum and variance*/
258
        stats[WIN_MIN] = stats[WIN_MAX] = stats[LAST];
259
        stats[WIN_VAR] = 0.0;
260
        for(i = 1; i <= n_samples; i++) {
261
                j = pos - i; // pos already incremented and last sample is starting one
262
                if(j < 0)
263
                        j += size;
264
                if(circular_buffer[j] < stats[WIN_MIN])
265
                        stats[WIN_MIN] = circular_buffer[j];
266
                if(circular_buffer[j] > stats[WIN_MAX])
267
                        stats[WIN_MAX] = circular_buffer[j];
268
                stats[WIN_VAR] += pow(stats[WIN_AVG] - circular_buffer[j], 2);
269
        }
270
        if(n_samples > 1)
271
                stats[WIN_VAR] = stats[WIN_VAR] / (n_samples - 1);
272
        else
273
                stats[WIN_VAR] = NAN;
274
}
275