Statistics
| Branch: | Revision:

napa-baselibs / monl / mon_measure.h @ 17af7b70

History | View | Annotate | Download (7.88 KB)

1
/***************************************************************************
2
 *   Copyright (C) 2009 by Robert Birke
3
 *   robert.birke@polito.it
4
 *
5
 * This library is free software; you can redistribute it and/or
6
 * modify it under the terms of the GNU Lesser General Public
7
 * License as published by the Free Software Foundation; either
8
 * version 2.1 of the License, or (at your option) any later version.
9

10
 * This library is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13
 * Lesser General Public License for more details.
14

15
 * You should have received a copy of the GNU Lesser General Public
16
 * License along with this library; if not, write to the Free Software
17
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA 
18
 ***********************************************************************/
19

    
20
#ifndef _MON_MEASURE_HH_
21
#define _MON_MEASURE_HH_
22

    
23
#include <vector>
24
#include <bitset>
25
#include <map>
26
#include "mon_parameter.h"
27
#include "measure_plugin.h"
28
#include "ids.h"
29
#include "mon.h"
30
#include <math.h>
31
#include "result_buffer.h"
32
#include "ml.h"
33

    
34
#include "napa_log.h"
35

    
36
#include <fstream>
37

    
38
/************************** monMeasure ********************************/
39

    
40
/* Measures can be:
41
 * - IN_BAND, OUT_OF_BAND, BOTH
42
 * - PACKET,DATA, BOTH
43
 * This is stored in a bitset called caps. If the bit is set the
44
 * corresponding type is true. flags contians how a particular instance
45
 * is used */
46

    
47
#pragma pack(push)  /* push current alignment to stack */
48
#pragma pack(1)     /* set alignment to 1 byte boundary */
49
struct res_mh_pair {
50
        result res;
51
        MonHandler mh;
52
};
53
#pragma pack(pop)   /* restore original alignment from stack */
54

    
55
typedef std::map<MeasurementId, class MonMeasure*> ExecutionList;
56

    
57
class MonMeasure {
58
        int rx_cnt;
59
        result meas_sum;
60
        int meas_cnt;
61
protected:
62
        bool auto_loaded;
63

    
64
        /* Bitmask defining the capabilities of this instance of the measure */
65
        MeasurementCapabilities flags;
66

    
67
        /* Pointer to the MeasurePlugin class describing this measure */
68
        class MeasurePlugin *measure_plugin;
69

    
70
        /* Status of the measurement instance */
71
        MeasurementStatus status;
72

    
73
        MonHandler mh_remote;
74

    
75
        uint8_t dst_socketid[SOCKETID_SIZE];
76
        bool dst_socketid_publish;
77
        MsgType  msg_type;
78

    
79
        int used_counter;
80

    
81
        ResultBuffer *rb;
82

    
83
        /* Parameter values */
84
        MonParameterValue *param_values;
85

    
86
        int tx_every;
87

    
88
        result every(result &m);
89

    
90
        void debugInit(const char *);
91
        void debugStop();
92
        std::fstream output_file;
93

    
94
        int paramChangeDefault(MonParameterType ph, MonParameterValue p) {
95
                if(ph == P_WINDOW_SIZE && rb != NULL)
96
                        return rb->resizeBuffer((int)ceil(p/tx_every));
97
                if(ph == P_INIT_NAN_ZERO && rb != NULL)
98
                        return rb->init();
99
                if(ph == P_DEBUG_FILE) {
100
                        if(p == 1.0 || p == 3.0)
101
                                debugInit(measure_plugin->name.c_str());
102
                        if(p == 0.0 || p == 2.0)
103
                                debugStop();
104
                        return EOK;
105
                }
106
                if(ph < P_LAST_DEFAULT_PARAM)
107
                        return EOK;
108
                return paramChange(ph,p);
109
        }
110

    
111
        result *r_rx_list;
112
        result *r_tx_list;
113

    
114
        /* called when the measure is put on an execution list */
115
        /* should be used to reinitialise the measurement in subsequent uses */
116
        virtual void init() {};
117
        virtual void stop() {};
118

    
119

    
120
        friend class MeasureManager;
121
        friend class MeasureDispatcher;
122
        friend class ResultBuffer;
123
public:
124

    
125
        void debugOutput(char *out);
126

    
127
        class MeasureDispatcher *ptrDispatcher;
128
        MonHandler mh_local;
129

    
130
        /* Functions */
131
        /* Constructor: MUST BE CALLED BY DERIVED CLASSES*/
132
        MonMeasure(class MeasurePlugin *mp, MeasurementCapabilities mc, class MeasureDispatcher *ptrDisp, int tx_every=1);
133

    
134
        virtual ~MonMeasure() {
135
                delete[] param_values;
136
                if(rb != NULL)
137
                        delete rb;
138
        };
139

    
140
        /* Get vector of MonParameterValues (pointers to the MonParameterValue instances) */
141
        std::vector<class MonParameter* >& listParameters(void) {
142
                return measure_plugin->params;
143
        };
144

    
145
        /* Set MonParameterValue value */
146
        int setParameter(size_t pid, MonParameterValue p) {
147
                if(pid < measure_plugin->params.size() && measure_plugin->params[pid]->validator(p) == EOK) {
148
                        MonParameterValue pt;
149
                        pt = param_values[pid];
150
                        param_values[pid] = p;
151
                        if(paramChangeDefault(pid,p) == EOK) {
152
                                return EOK;
153
                        }
154
                        param_values[pid] = pt;
155
                }
156
                return -EINVAL;
157
        };
158

    
159
        /* Get MonParameterValue value */
160
        MonParameterValue getParameter(size_t pid) {
161
                if(pid >= measure_plugin->params.size())
162
                        return NAN;
163
                return param_values[pid];
164
        };
165

    
166
        MeasurementStatus getStatus() {
167
        return status;
168
        }
169

    
170
        /* Get flags */
171
        MeasurementCapabilities getFlags() {
172
                return flags;
173
        }
174

    
175
        /* Set flags */
176
        int setFlags(MeasurementCapabilities f) {
177
                if(f == measure_plugin->getCaps() && f == 0)
178
                        return EOK;
179
                /*check that f is a subset of Caps */
180
                if(f & ~(measure_plugin->getCaps() | TXRXBI | REMOTE))
181
                        return -ERANGE;
182
                /* Check packet vs chunk */
183
                if(!((f & PACKET)^(f & DATA)))
184
                        return -ERANGE;
185
                /* check inband vs out-of-band */
186
                if(f & TIMER_BASED)
187
                        flags |= TIMER_BASED | IN_BAND | OUT_OF_BAND;
188
                else 
189
                        if(!((f & IN_BAND)^(f & OUT_OF_BAND)))
190
                                return -ERANGE;
191
                return EOK;
192
        }
193

    
194
        int setPublishStatisticalType(const char * publishing_name, const char *channel, enum stat_types st[], int length, void *repo_client) {
195
                if(rb == NULL)
196
                {
197
                        info("rb == NULL");
198
                        return -EINVAL;
199
                }
200

    
201
                int result = rb->setPublishStatisticalType(publishing_name, channel, st, length, repo_client);
202

    
203
                return result;
204
        };
205

    
206
        void defaultInit() {
207
                init();
208
                if(param_values[P_DEBUG_FILE] == 1.0 || param_values[P_DEBUG_FILE] == 3.0)
209
                        debugInit(measure_plugin->name.c_str());
210
        };
211

    
212
        void defaultStop() {
213
                if(rb)
214
                        rb->publishResults();
215
                stop();
216
                if(param_values[P_DEBUG_FILE] == 1.0 || param_values[P_DEBUG_FILE] == 3.0)
217
                        debugStop();
218
        };
219

    
220
        /* called when  changes in parameter values happen */
221
        virtual int paramChange(MonParameterType ph, MonParameterValue p) {return EOK;};
222

    
223

    
224
        void RxPktLocal(ExecutionList *el) {
225
                if(r_rx_list == NULL)
226
                        return;
227
                newSample(RxPkt(r_rx_list, el));
228
        };
229
        void RxPktRemote(struct res_mh_pair *rmp, int &i, ExecutionList *el) {
230
                if(r_rx_list == NULL)
231
                        return;
232
                result r = RxPkt(r_rx_list, el);
233
                if(!isnan(r)) {
234
                        rmp[i].res = r;
235
                        rmp[i].mh = mh_remote;
236
                        i++;
237
                }
238
        };
239

    
240
        void RxDataLocal(ExecutionList *el) {
241
                if(r_rx_list == NULL)
242
                        return;
243
                newSample(RxData(r_rx_list,el));
244
        };
245
        void RxDataRemote(struct res_mh_pair *rmp, int &i, ExecutionList *el) {
246
                if(r_rx_list == NULL)
247
                        return;
248
                result r = RxData(r_rx_list, el);
249
                if(!isnan(r)) {
250
                        rmp[i].res = r;
251
                        rmp[i].mh = mh_remote;
252
                        i++;
253
                }
254
        };
255

    
256
        void TxPktLocal(ExecutionList *el) {
257
                if(r_tx_list == NULL)
258
                        return;
259
                newSample(TxPkt(r_tx_list, el));
260
        };
261
        void TxPktRemote(struct res_mh_pair *rmp, int &i, ExecutionList *el) {
262
                if(r_tx_list == NULL)
263
                        return;
264
                result r = TxPkt(r_tx_list, el);
265
                if(!isnan(r)) {
266
                        rmp[i].res = r;
267
                        rmp[i].mh = mh_remote;
268
                        i++;
269
                }
270
        };
271

    
272
        void TxDataLocal(ExecutionList *el)  {
273
                if(r_tx_list == NULL)
274
                        return;
275
                newSample(TxData(r_tx_list, el));
276
        };
277
        void TxDataRemote(struct res_mh_pair *rmp, int &i, ExecutionList *el) {
278
                if(r_tx_list == NULL)
279
                        return;
280
                result r = TxData(r_tx_list, el);
281
                if(!isnan(r)) {
282
                        rmp[i].res = r;
283
                        rmp[i].mh = mh_remote;
284
                        i++;
285
                }
286
        };
287

    
288
        int newSample(result r) {
289
                if(!isnan(r) && rb != NULL) {
290
                        rb->newSample(r);
291
                        return EOK;
292
                }
293
                else
294
                        return -EINVAL;
295
        };
296

    
297
        //If OOB this function is called upon scheduled event
298
        virtual void Run() {return;};
299
        virtual void receiveOobData(char *buf, int buf_len, result *r) {return;};
300
        int scheduleNextIn(struct timeval *tv);
301
        
302

    
303
        int sendOobData(char* buf, int buf_len);
304

    
305
        /* Functions processing a message (packet and data) and extracting the measure to be stored */
306
        virtual result RxPkt(result *r, ExecutionList *) {return NAN;};
307
        virtual result RxData(result *r, ExecutionList *) {return NAN;};
308
        virtual result TxPkt(result *r, ExecutionList *) {return NAN;};
309
        virtual result TxData(result *r, ExecutionList *) {return NAN;};
310
        
311
        //void OutOfBandcb(HANDLE *event_handle, void *arg) {return;};
312
};
313

    
314
#endif  /* _MON_MEASURE_HH_ */