Statistics
| Branch: | Revision:

napa-baselibs / monl / result_buffer.cpp @ 49a081ee

History | View | Annotate | Download (6.71 KB)

1
/***************************************************************************
2
 *   Copyright (C) 2009 by Robert Birke
3
 *   robert.birke@polito.it
4
 *
5
 * This library is free software; you can redistribute it and/or
6
 * modify it under the terms of the GNU Lesser General Public
7
 * License as published by the Free Software Foundation; either
8
 * version 2.1 of the License, or (at your option) any later version.
9

10
 * This library is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13
 * Lesser General Public License for more details.
14

15
 * You should have received a copy of the GNU Lesser General Public
16
 * License along with this library; if not, write to the Free Software
17
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA 
18
 ***********************************************************************/
19

    
20
#include "result_buffer.h"
21
#include "errors.h"
22
#include        "napa_log.h"
23
#include "repoclient.h"
24

    
25
#include "mon_event.h"
26

    
27
#ifndef _WIN32
28
#include <arpa/inet.h>
29
#endif
30

    
31
#include <sys/time.h>
32
#include <stdlib.h>
33
#include <math.h>
34

    
35
//TODO:
36
//- check if pubblishing was succesfull
37

    
38

    
39
const char* ResultBuffer::stat_suffixes[] = {
40
        "_last",
41
        "_avg",
42
        "_win_avg",
43
        "_var",
44
        "_win_var",
45
        "_min",
46
        "_win_min",
47
        "_max",
48
        "_win_max",
49
        "_sum",
50
        "_win_sum",
51
        "_rate",
52
        "_period_sum"
53
};
54

    
55
int ResultBuffer::publishResults(void){
56
        MeasurementRecord mr;
57
        SocketId sid;
58
        char sidA_string[SOCKETID_STRING_SIZE] = "";
59
        char sidB_string[SOCKETID_STRING_SIZE] = "";
60
        int errorstatus;
61
        result ts;
62

    
63
        if(m->param_values[P_PUBLISHING_PKT_TIME_BASED] == 1.0) {//time based
64
                struct timeval tv;
65
                float delta = m->param_values[P_PUBLISHING_RATE] * m->param_values[P_PUBLISHING_TIME_SPREAD] / 100.0; 
66
                tv.tv_sec = m->param_values[P_PUBLISHING_RATE] - delta +  2.0 * delta * (float) rand() / (float) RAND_MAX;
67
                tv.tv_usec = 0;
68
                schedule_publish(&tv, m);
69
        }
70

    
71
        if(m->status != RUNNING)
72
                return EOK;
73

    
74
        if(!new_data && m->param_values[P_PUBLISHING_NEW_ALL] == 0.0)
75
                return EOK;
76
        new_data = false;
77

    
78
        updateStats();
79

    
80
        info("MONL: publishResults called: %s last value: %f (%s)", publish_name.c_str(), stats[LAST], default_name);
81
        if(publish_length == 0)
82
                return EOK;
83
        /* Get local ID */
84
        sid = mlGetLocalSocketID(&errorstatus);
85
        if(errorstatus != 0)
86
                return -EFAILED;
87
        if(mlSocketIDToString(sid, sidA_string, sizeof(sidA_string)) != 0)
88
                return -EFAILED;
89
        if(m->dst_socketid != NULL && m->dst_socketid_publish) {
90
                if(mlSocketIDToString((SocketId) m->dst_socketid, sidB_string, sizeof(sidB_string)) != 0)
91
                        return -EFAILED;
92
        }
93
        if(*originator_name == NULL)
94
                mr.originator = sidA_string;
95
        else
96
                mr.originator = *originator_name;
97
        mr.targetA = sidA_string;
98
        mr.targetB = sidB_string;
99
        mr.string_value = NULL;
100
        mr.channel = cnl.c_str();
101
        gettimeofday(&mr.timestamp, NULL);
102
        
103
        /* rate */
104
        ts =  mr.timestamp.tv_usec / 1000000.0;
105
        ts += mr.timestamp.tv_sec;
106
        if(last_publish != 0.0) {
107
                stats[RATE] = rate_sum_samples / (ts - last_publish);
108
        }
109
        last_publish = ts;
110
        rate_sum_samples = 0.0;
111
        
112
        for(int i = 0; i < publish_length; i++) {
113
                mr.value = stats[publish[i]];
114
                if(isnan(mr.value))
115
                        continue;
116
                publish_name += stat_suffixes[publish[i]];
117
                mr.published_name = publish_name.c_str();
118
                debug("MONL: publishResults called: %s value: %f", mr.published_name, mr.value);
119
                fprintf(stderr,"abouttopublish,%s,%s,%s,%s,%f,%s,%s,%.3f\n",
120
                mr.originator, mr.targetA, mr.targetB, mr.published_name, mr.value,
121
                mr.string_value, mr.channel, ts);
122
                if (repo_client) repPublish(repo_client, NULL, NULL, &mr);
123
                publish_name.erase(name_size);
124
        }
125

    
126
        stats[PERIOD_SUM] = NAN;
127

    
128
        return EOK;
129
};
130

    
131
int ResultBuffer::init() {
132
        int i;
133
        n_samples = 0; samples = 0;
134
        sum_win_samples = 0; rate_sum_samples = 0;
135
        pos = 0;
136
        sum_win_var_samples = 0;
137
        for(i = 0; i < LAST_STAT_TYPE; i++)
138
                stats[i] = (m->param_values[P_INIT_NAN_ZERO] == 0.0 ? NAN : 0);
139
        for(i = 0; i < size; i++)
140
                circular_buffer[i] = NAN;
141
        return EOK;
142
}
143

    
144
int ResultBuffer::resizeBuffer(int s) {
145
        result *old_buffer = circular_buffer;
146
        result *old_buffer_var = circular_buffer_var;
147
        int i,j, n_sam, old_pos, old_size;
148
        
149
        if(s == size)
150
                return EOK;
151

    
152

    
153
        circular_buffer = new result[s];
154
        circular_buffer_var = new result[s];
155

    
156
        n_sam = fmin(s, fmin(size, n_samples));
157
        old_pos = pos;
158
        old_size = size;
159

    
160
        sum_win_var_samples = 0;
161
        n_samples = 0; pos = 0; size = s;
162
        stats[WIN_SUM] = 0;
163

    
164
        for(i = 0; i < n_sam; i++) {
165
                j = old_pos - n_sam + i;
166
                if(j < 0)
167
                        j = j + old_size;
168
                newSampleWin(old_buffer[j]);
169
        }
170

    
171
        
172
        delete[] old_buffer;
173
        delete[] old_buffer_var;
174
        return EOK;
175
};
176

    
177
/* This function is used to update internal variables with data from a new sample */
178
int ResultBuffer::newSample(result r) {
179
        samples++;
180

    
181
        stats[LAST] = r;
182

    
183
        //TODO add statistical data like avg
184

    
185
        if(isnan(stats[SUM]))
186
                stats[SUM] = r;
187
        else
188
                stats[SUM] += r;
189

    
190
        if(isnan(stats[PERIOD_SUM]))
191
                stats[PERIOD_SUM] = r;
192
        else
193
                stats[PERIOD_SUM] += r;
194

    
195
        rate_sum_samples += r;
196

    
197
        /* Average  */
198
        if(isnan(stats[AVG]))
199
                stats[AVG] = 0;
200
        stats[AVG] = ((samples - 1) * stats[AVG] + r) / samples;
201

    
202
        /* Variance */
203
        if(samples < 2)
204
                stats[VAR] = 0;
205
        else
206
                stats[VAR] = stats[VAR] * (samples - 2)/(samples - 1) + samples / pow(samples - 1, 2) * pow(r - stats[AVG],2);
207

    
208
        /* Minimum maximum */
209
        if(r < stats[MIN] || isnan(stats[MIN]))
210
                stats[MIN] = r;
211

    
212
        if(r > stats[MAX] || isnan(stats[MAX]))
213
                stats[MAX] = r;
214

    
215
        newSampleWin(r);
216

    
217
        new_data = true;
218

    
219
        /* is it time to publish ?*/
220
        if(m->param_values[P_PUBLISHING_PKT_TIME_BASED] == 0.0 && samples % (int)m->param_values[P_PUBLISHING_RATE] == 0) {
221
                return publishResults();
222
        }
223

    
224
        return EOK;
225
};
226

    
227
int ResultBuffer::newSampleWin(result r) {
228

    
229
        /* sum */
230
        if(n_samples < size)        {
231
                n_samples++;
232
        } else {
233
                stats[WIN_SUM] -= circular_buffer[pos];
234
        }
235

    
236
        if(isnan(stats[WIN_SUM]))
237
                stats[WIN_SUM] = r;
238
        else
239
                stats[WIN_SUM] += r;
240

    
241
        circular_buffer[pos] = r;
242

    
243
        pos++;
244
        if(pos >= size)
245
                pos -= size;
246

    
247
        return EOK;
248
};
249

    
250

    
251
/*This function updates the stats vector */
252
void ResultBuffer::updateStats(void) {
253
        int i,j;
254

    
255
        /* Note: some stats are already updated on the fly in newSample() */
256
        /* This function is to update the more time consuming not recursive statitistics */
257

    
258
        stats[WIN_AVG] = stats[WIN_SUM]/n_samples;
259

    
260
        /* Minimum, maximum and variance*/
261
        stats[WIN_MIN] = stats[WIN_MAX] = stats[LAST];
262
        stats[WIN_VAR] = 0.0;
263
        for(i = 1; i <= n_samples; i++) {
264
                j = pos - i; // pos already incremented and last sample is starting one
265
                if(j < 0)
266
                        j += size;
267
                if(circular_buffer[j] < stats[WIN_MIN])
268
                        stats[WIN_MIN] = circular_buffer[j];
269
                if(circular_buffer[j] > stats[WIN_MAX])
270
                        stats[WIN_MAX] = circular_buffer[j];
271
                stats[WIN_VAR] += pow(stats[WIN_AVG] - circular_buffer[j], 2);
272
        }
273
        if(n_samples > 1)
274
                stats[WIN_VAR] = stats[WIN_VAR] / (n_samples - 1);
275
        else
276
                stats[WIN_VAR] = NAN;
277
}
278