Statistics
| Branch: | Revision:

napa-baselibs / monl / result_buffer.cpp @ 93e03976

History | View | Annotate | Download (6.48 KB)

1
/***************************************************************************
2
 *   Copyright (C) 2009 by Robert Birke
3
 *   robert.birke@polito.it
4
 *
5
 * This library is free software; you can redistribute it and/or
6
 * modify it under the terms of the GNU Lesser General Public
7
 * License as published by the Free Software Foundation; either
8
 * version 2.1 of the License, or (at your option) any later version.
9

10
 * This library is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13
 * Lesser General Public License for more details.
14

15
 * You should have received a copy of the GNU Lesser General Public
16
 * License along with this library; if not, write to the Free Software
17
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA 
18
 ***********************************************************************/
19

    
20
#include "result_buffer.h"
21
#include "errors.h"
22
#include        "grapes_log.h"
23
#include "repoclient.h"
24

    
25
#include "mon_event.h"
26

    
27
#include <arpa/inet.h>
28

    
29
#include <sys/time.h>
30
#include <stdlib.h>
31
#include <math.h>
32

    
33
//TODO:
34
//- check if pubblishing was succesfull
35

    
36

    
37
const char* ResultBuffer::stat_suffixes[] = {
38
        "_last",
39
        "_avg",
40
        "_win_avg",
41
        "_var",
42
        "_win_var",
43
        "_min",
44
        "_win_min",
45
        "_max",
46
        "_win_max",
47
        "_sum",
48
        "_win_sum",
49
        "_rate"
50
};
51

    
52
int ResultBuffer::publishResults(void){
53
        MeasurementRecord mr;
54
        SocketId sid;
55
        char sidA_string[SOCKETID_STRING_SIZE] = "";
56
        char sidB_string[SOCKETID_STRING_SIZE] = "";
57
        int errorstatus;
58
        result ts;
59

    
60
        if(m->param_values[P_PUBLISHING_PKT_TIME_BASED] == 1.0) {//time based
61
                struct timeval tv;
62
                float delta = m->param_values[P_PUBLISHING_RATE] * m->param_values[P_PUBLISHING_TIME_SPREAD] / 100.0; 
63
                tv.tv_sec = m->param_values[P_PUBLISHING_RATE] - delta +  2.0 * delta * (float) rand() / (float) RAND_MAX;
64
                tv.tv_usec = 0;
65
                schedule_publish(&tv, m);
66
        }
67

    
68
        if(m->status != RUNNING)
69
                return EOK;
70

    
71
        if(!new_data && m->param_values[P_PUBLISHING_NEW_ALL] == 0.0)
72
                return EOK;
73
        new_data = false;
74

    
75
        info("MONL: publishResults called: %s value: %f (%s)", publish_name.c_str(), stats[LAST], default_name);
76
        if(publish_length == 0)
77
                return EOK;
78
        /* Get local ID */
79
        sid = mlGetLocalSocketID(&errorstatus);
80
        if(errorstatus != 0)
81
                return -EFAILED;
82
        if(mlSocketIDToString(sid, sidA_string, sizeof(sidA_string)) != 0)
83
                return -EFAILED;
84
        if(m->dst_socketid != NULL) {
85
                if(mlSocketIDToString((SocketId) m->dst_socketid, sidB_string, sizeof(sidB_string)) != 0)
86
                        return -EFAILED;
87
        }
88
        if(*originator_name == NULL)
89
                mr.originator = sidA_string;
90
        else
91
                mr.originator = *originator_name;
92
        mr.targetA = sidA_string;
93
        mr.targetB = sidB_string;
94
        mr.string_value = NULL;
95
        mr.channel = cnl.c_str();
96
        gettimeofday(&mr.timestamp, NULL);
97
        
98
        /* rate */
99
        ts =  mr.timestamp.tv_usec / 1000000.0;
100
        ts += mr.timestamp.tv_sec;
101
        if(last_publish != 0.0) {
102
                stats[RATE] = rate_sum_samples / (ts - last_publish);
103
        }
104
        last_publish = ts;
105
        rate_sum_samples = 0.0;
106
        
107
        for(int i = 0; i < publish_length; i++) {
108
                mr.value = stats[publish[i]];
109
                if(isnan(mr.value))
110
                        continue;
111
                publish_name += stat_suffixes[publish[i]];
112
                mr.published_name = publish_name.c_str();
113
                debug("MONL: publishResults called: %s value: %f", mr.published_name, mr.value);
114
                repPublish(repo_client, NULL, NULL, &mr);
115
                publish_name.erase(name_size);
116
        }
117
        return EOK;
118
};
119

    
120
int ResultBuffer::init() {
121
        int i;
122
        n_samples = 0; samples = 0; sum_samples = 0;
123
        sum_win_samples = 0; rate_sum_samples = 0;
124
        pos = 0;
125
        sum_var_samples = sum_win_var_samples = 0;
126
        for(i = 0; i < LAST_STAT_TYPE; i++)
127
                stats[i] = (m->param_values[P_INIT_NAN_ZERO] == 0.0 ? NAN : 0);
128
        for(i = 0; i < size; i++)
129
                circular_buffer[i] = NAN;
130
        return EOK;
131
}
132

    
133
int ResultBuffer::resizeBuffer(int s) {
134
        result *old_buffer = circular_buffer;
135
        result *old_buffer_var = circular_buffer_var;
136
        int i,j, n_sam, old_pos, old_size;
137
        
138
        if(s == size)
139
                return EOK;
140

    
141

    
142
        circular_buffer = new result[s];
143
        circular_buffer_var = new result[s];
144

    
145
        n_sam = fmin(s, fmin(size, n_samples));
146
        old_pos = pos;
147
        old_size = size;
148

    
149
        sum_win_samples = sum_win_var_samples = 0;
150
        n_samples = 0; pos = 0; size = s;
151

    
152
        for(i = 0; i < n_sam; i++) {
153
                j = old_pos - n_sam + i;
154
                if(j < 0)
155
                        j = j + old_size;
156
                newSampleWin(old_buffer[j]);
157
        }
158

    
159
        
160
        delete[] old_buffer;
161
        delete[] old_buffer_var;
162
        return EOK;
163
};
164

    
165
int ResultBuffer::newSample(result r) {
166
        samples++;
167

    
168
        stats[LAST] = r;
169

    
170
        //TODO add statistical data like avg
171

    
172
        sum_samples += r;
173
        rate_sum_samples += r;
174

    
175
        stats[SUM] = sum_samples;
176

    
177
        /* Average  */
178
        if(isnan(stats[AVG]))
179
                stats[AVG] = 0;
180
        stats[AVG] = ((samples - 1) * stats[AVG] + r) / samples;
181

    
182
        /* Variance */
183
        sum_var_samples += pow(r - stats[AVG],2);
184

    
185
        if(samples < 2)
186
                stats[VAR] = 0;
187
        else
188
                stats[VAR] = stats[VAR] * (samples - 2)/(samples - 1) + samples / pow(samples - 1, 2) * pow(r - stats[AVG],2);
189

    
190

    
191
        /* Minimum maximum */
192
        if(r < stats[MIN] || isnan(stats[MIN]))
193
                stats[MIN] = r;
194

    
195
        if(r > stats[MAX] || isnan(stats[MAX]))
196
                stats[MAX] = r;
197

    
198
        // Update window based stats
199
        newSampleWin(r);
200

    
201
        new_data = true;
202

    
203
        /* is it time to publish ?*/
204
        if(m->param_values[P_PUBLISHING_PKT_TIME_BASED] == 0.0 && samples % (int)m->param_values[P_PUBLISHING_RATE] == 0) {
205
                return publishResults();
206
        }
207

    
208
        return EOK;
209
};
210

    
211
int ResultBuffer::newSampleWin(result r) {
212
        int i,j;
213
        result var_s;
214

    
215
        //TODO add statistical data like avg
216

    
217
        /* sum */
218
        if(n_samples < size)        {
219
                n_samples++;
220
        } else {
221
                sum_win_samples -= circular_buffer[pos];
222
                sum_win_var_samples -= circular_buffer_var[pos];
223
        }
224
        sum_win_samples += r;
225

    
226
        stats[WIN_SUM] = sum_win_samples;        
227

    
228
        stats[WIN_AVG] = sum_win_samples/n_samples;
229

    
230
        var_s = pow(r - stats[WIN_AVG],2);
231
        sum_win_var_samples += var_s;
232

    
233
        if(n_samples > 1)
234
                stats[WIN_VAR] = sum_win_var_samples/(n_samples - 1);
235

    
236
        /* Minimum maximum */
237
        if(isnan(stats[WIN_MIN]))
238
                stats[WIN_MIN] = r;
239
        if(isnan(stats[WIN_MAX]))
240
                stats[WIN_MAX] = r;
241

    
242
        if(stats[WIN_MIN] == circular_buffer[pos] || stats[WIN_MAX] == circular_buffer[pos]) {
243
                circular_buffer[pos] = stats[WIN_MAX] = stats[WIN_MIN] = r;
244
                for(i = 0; i <= n_samples - 1; i++) {
245
                        j = pos - i; // pos already incremented and last sample is starting one
246
                        if(j < 0)
247
                                j += size;
248
                        if(circular_buffer[j] < stats[WIN_MIN])
249
                                stats[WIN_MIN] = circular_buffer[j];
250
                        if(circular_buffer[j] > stats[WIN_MAX])
251
                                stats[WIN_MAX] = circular_buffer[j];
252
                }
253
        } else {
254
                if(r < stats[WIN_MIN]) 
255
                        stats[WIN_MIN] = r;
256
                if (r > stats[WIN_MAX])
257
                        stats[WIN_MAX] = r;
258
        }
259

    
260
        circular_buffer[pos] = r;
261
        circular_buffer_var[pos] = var_s;
262
        pos++;
263
        if(pos >= size)
264
                pos -= size;
265

    
266
        return EOK;
267
};