Statistics
| Branch: | Revision:

napa-baselibs / monl / mon_measure.h @ 507372bb

History | View | Annotate | Download (7.81 KB)

1
/***************************************************************************
2
 *   Copyright (C) 2009 by Robert Birke
3
 *   robert.birke@polito.it
4
 *
5
 * This library is free software; you can redistribute it and/or
6
 * modify it under the terms of the GNU Lesser General Public
7
 * License as published by the Free Software Foundation; either
8
 * version 2.1 of the License, or (at your option) any later version.
9

10
 * This library is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13
 * Lesser General Public License for more details.
14

15
 * You should have received a copy of the GNU Lesser General Public
16
 * License along with this library; if not, write to the Free Software
17
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA 
18
 ***********************************************************************/
19

    
20
#ifndef _MON_MEASURE_HH_
21
#define _MON_MEASURE_HH_
22

    
23
#include <vector>
24
#include <bitset>
25
#include <map>
26
#include "mon_parameter.h"
27
#include "measure_plugin.h"
28
#include "ids.h"
29
#include "mon.h"
30
#include <math.h>
31
#include "result_buffer.h"
32
#include "ml.h"
33

    
34
#include "napa_log.h"
35

    
36
#include <fstream>
37

    
38
/************************** monMeasure ********************************/
39

    
40
/* Measures can be:
41
 * - IN_BAND, OUT_OF_BAND, BOTH
42
 * - PACKET,DATA, BOTH
43
 * This is stored in a bitset called caps. If the bit is set the
44
 * corresponding type is true. flags contians how a particular instance
45
 * is used */
46

    
47
#pragma pack(push)  /* push current alignment to stack */
48
#pragma pack(1)     /* set alignment to 1 byte boundary */
49
struct res_mh_pair {
50
        result res;
51
        MonHandler mh;
52
};
53
#pragma pack(pop)   /* restore original alignment from stack */
54

    
55
typedef std::map<MeasurementId, class MonMeasure*> ExecutionList;
56

    
57
class MonMeasure {
58
        int rx_cnt;
59
        result meas_sum;
60
        int meas_cnt;
61
protected:
62
        bool auto_loaded;
63

    
64
        /* Bitmask defining the capabilities of this instance of the measure */
65
        MeasurementCapabilities flags;
66

    
67
        /* Pointer to the MeasurePlugin class describing this measure */
68
        class MeasurePlugin *measure_plugin;
69

    
70
        /* Status of the measurement instance */
71
        MeasurementStatus status;
72

    
73
        MonHandler mh_remote;
74

    
75
        uint8_t dst_socketid[SOCKETID_SIZE];
76
        bool dst_socketid_publish;
77
        MsgType  msg_type;
78

    
79
        int used_counter;
80

    
81
        ResultBuffer *rb;
82

    
83
        /* Parameter values */
84
        MonParameterValue *param_values;
85

    
86
        int tx_every;
87

    
88
        result every(result &m);
89

    
90
        void debugInit(const char *);
91
        void debugStop();
92
        std::fstream output_file;
93

    
94
        int paramChangeDefault(MonParameterType ph, MonParameterValue p) {
95
                if(ph == P_WINDOW_SIZE && rb != NULL)
96
                        return rb->resizeBuffer((int)p);
97
                if(ph == P_INIT_NAN_ZERO && rb != NULL)
98
                        return rb->init();
99
                if(ph == P_DEBUG_FILE) {
100
                        if(param_values[P_DEBUG_FILE] == p)
101
                                return EOK;
102
                        if(p == 1)
103
                                debugInit(measure_plugin->name.c_str());
104
                        if(p == 0)
105
                                debugStop();
106
                        return EOK;
107
                }
108
                if(ph < P_LAST_DEFAULT_PARAM)
109
                        return EOK;
110
                return paramChange(ph,p);
111
        }
112

    
113
        result *r_rx_list;
114
        result *r_tx_list;
115

    
116
        /* called when the measure is put on an execution list */
117
        /* should be used to reinitialise the measurement in subsequent uses */
118
        virtual void init() {};
119
        virtual void stop() {};
120

    
121

    
122
        friend class MeasureManager;
123
        friend class MeasureDispatcher;
124
        friend class ResultBuffer;
125
public:
126

    
127
        void debugOutput(char *out);
128

    
129
        class MeasureDispatcher *ptrDispatcher;
130
        MonHandler mh_local;
131

    
132
        /* Functions */
133
        /* Constructor: MUST BE CALLED BY DERIVED CLASSES*/
134
        MonMeasure(class MeasurePlugin *mp, MeasurementCapabilities mc, class MeasureDispatcher *ptrDisp);
135

    
136
        virtual ~MonMeasure() {
137
                delete[] param_values;
138
                if(rb != NULL)
139
                        delete rb;
140
        };
141

    
142
        /* Get vector of MonParameterValues (pointers to the MonParameterValue instances) */
143
        std::vector<class MonParameter* >& listParameters(void) {
144
                return measure_plugin->params;
145
        };
146

    
147
        /* Set MonParameterValue value */
148
        int setParameter(size_t pid, MonParameterValue p) {
149
                if(pid < measure_plugin->params.size() && measure_plugin->params[pid]->validator(p) == EOK) {
150
                        MonParameterValue pt;
151
                        pt = param_values[pid];
152
                        param_values[pid] = p;
153
                        if(paramChangeDefault(pid,p) == EOK) {
154
                                return EOK;
155
                        }
156
                        param_values[pid] = pt;
157
                }
158
                return -EINVAL;
159
        };
160

    
161
        /* Get MonParameterValue value */
162
        MonParameterValue getParameter(size_t pid) {
163
                if(pid >= measure_plugin->params.size())
164
                        return NAN;
165
                return param_values[pid];
166
        };
167

    
168
        MeasurementStatus getStatus() {
169
        return status;
170
        }
171

    
172
        /* Get flags */
173
        MeasurementCapabilities getFlags() {
174
                return flags;
175
        }
176

    
177
        /* Set flags */
178
        int setFlags(MeasurementCapabilities f) {
179
                if(f == measure_plugin->getCaps() && f == 0)
180
                        return EOK;
181
                /*check that f is a subset of Caps */
182
                if(f & ~(measure_plugin->getCaps() | TXRXBI | REMOTE))
183
                        return -ERANGE;
184
                /* Check packet vs chunk */
185
                if(!((f & PACKET)^(f & DATA)))
186
                        return -ERANGE;
187
                /* check inband vs out-of-band */
188
                if(f & TIMER_BASED)
189
                        flags |= TIMER_BASED | IN_BAND | OUT_OF_BAND;
190
                else 
191
                        if(!((f & IN_BAND)^(f & OUT_OF_BAND)))
192
                                return -ERANGE;
193
                return EOK;
194
        }
195

    
196
        int setPublishStatisticalType(const char * publishing_name, const char *channel, enum stat_types st[], int length, void *repo_client) {
197
                if(rb == NULL)
198
                {
199
                        info("rb == NULL");
200
                        return -EINVAL;
201
                }
202

    
203
                int result = rb->setPublishStatisticalType(publishing_name, channel, st, length, repo_client);
204

    
205
                return result;
206
        };
207

    
208
        void defaultInit() {
209
                init();
210
                if(param_values[P_DEBUG_FILE] == 1.0)
211
                        debugInit(measure_plugin->name.c_str());
212
        };
213

    
214
        void defaultStop() {
215
                if(rb)
216
                        rb->publishResults();
217
                stop();
218
                if(param_values[P_DEBUG_FILE] == 1.0)
219
                        debugStop();
220
        };
221

    
222
        /* called when  changes in parameter values happen */
223
        virtual int paramChange(MonParameterType ph, MonParameterValue p) {return EOK;};
224

    
225

    
226
        void RxPktLocal(ExecutionList *el) {
227
                if(r_rx_list == NULL)
228
                        return;
229
                newSample(RxPkt(r_rx_list, el));
230
        };
231
        void RxPktRemote(struct res_mh_pair *rmp, int &i, ExecutionList *el) {
232
                if(r_rx_list == NULL)
233
                        return;
234
                result r = RxPkt(r_rx_list, el);
235
                if(!isnan(r)) {
236
                        rmp[i].res = r;
237
                        rmp[i].mh = mh_remote;
238
                        i++;
239
                }
240
        };
241

    
242
        void RxDataLocal(ExecutionList *el) {
243
                if(r_rx_list == NULL)
244
                        return;
245
                newSample(RxData(r_rx_list,el));
246
        };
247
        void RxDataRemote(struct res_mh_pair *rmp, int &i, ExecutionList *el) {
248
                if(r_rx_list == NULL)
249
                        return;
250
                result r = RxData(r_rx_list, el);
251
                if(!isnan(r)) {
252
                        rmp[i].res = r;
253
                        rmp[i].mh = mh_remote;
254
                        i++;
255
                }
256
        };
257

    
258
        void TxPktLocal(ExecutionList *el) {
259
                if(r_tx_list == NULL)
260
                        return;
261
                newSample(TxPkt(r_tx_list, el));
262
        };
263
        void TxPktRemote(struct res_mh_pair *rmp, int &i, ExecutionList *el) {
264
                if(r_tx_list == NULL)
265
                        return;
266
                result r = TxPkt(r_tx_list, el);
267
                if(!isnan(r)) {
268
                        rmp[i].res = r;
269
                        rmp[i].mh = mh_remote;
270
                        i++;
271
                }
272
        };
273

    
274
        void TxDataLocal(ExecutionList *el)  {
275
                if(r_tx_list == NULL)
276
                        return;
277
                newSample(TxData(r_tx_list, el));
278
        };
279
        void TxDataRemote(struct res_mh_pair *rmp, int &i, ExecutionList *el) {
280
                if(r_tx_list == NULL)
281
                        return;
282
                result r = TxData(r_tx_list, el);
283
                if(!isnan(r)) {
284
                        rmp[i].res = r;
285
                        rmp[i].mh = mh_remote;
286
                        i++;
287
                }
288
        };
289

    
290
        int newSample(result r) {
291
                if(!isnan(r) && rb != NULL) {
292
                        rb->newSample(r);
293
                        return EOK;
294
                }
295
                else
296
                        return -EINVAL;
297
        };
298

    
299
        //If OOB this function is called upon scheduled event
300
        virtual void Run() {return;};
301
        virtual void receiveOobData(char *buf, int buf_len, result *r) {return;};
302
        int scheduleNextIn(struct timeval *tv);
303
        
304

    
305
        int sendOobData(char* buf, int buf_len);
306

    
307
        /* Functions processing a message (packet and data) and extracting the measure to be stored */
308
        virtual result RxPkt(result *r, ExecutionList *) {return NAN;};
309
        virtual result RxData(result *r, ExecutionList *) {return NAN;};
310
        virtual result TxPkt(result *r, ExecutionList *) {return NAN;};
311
        virtual result TxData(result *r, ExecutionList *) {return NAN;};
312
        
313
        //void OutOfBandcb(HANDLE *event_handle, void *arg) {return;};
314
};
315

    
316
#endif  /* _MON_MEASURE_HH_ */