Statistics
| Branch: | Revision:

napa-baselibs / monl / measure_dispatcher.cpp @ 71056291

History | View | Annotate | Download (30.6 KB)

1
/***************************************************************************
2
 *   Copyright (C) 2009 by Robert Birke
3
 *   robert.birke@polito.it
4
 *
5
 * This library is free software; you can redistribute it and/or
6
 * modify it under the terms of the GNU Lesser General Public
7
 * License as published by the Free Software Foundation; either
8
 * version 2.1 of the License, or (at your option) any later version.
9

10
 * This library is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13
 * Lesser General Public License for more details.
14

15
 * You should have received a copy of the GNU Lesser General Public
16
 * License along with this library; if not, write to the Free Software
17
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA 
18
 ***********************************************************************/
19

    
20
#include "measure_dispatcher.h"
21
#include "measure_manager.h"
22
#include "errors.h"
23
#include "ctrl_msg.h"
24
#include "grapes_log.h"
25

    
26

    
27
#include <arpa/inet.h>
28
#include <string.h>
29
#include <sstream>
30
#include <stdint.h>
31
#include <sys/time.h>
32

    
33
//TODO: possible improvements:
34
// - make buffer a ptr and don't make an explicit copy
35
// - implement a better hash and compare for SocketId
36
// - use activate/deactivate for automatic loading/unloading (done has to be checked)
37

    
38

    
39
void open_con_cb (int con_id, void *arg) {
40
        Message *msg = (Message*) arg;
41

    
42
        mlSendData(con_id,msg->msg,msg->length,msg->mt,NULL);
43

    
44
        delete[] msg->msg;
45
        delete msg;
46
}
47

    
48
void MeasureDispatcher::addMeasureToExecLists(SocketIdMt h_dst, class MonMeasure *m) {
49
        class MeasurePlugin *mp = m->measure_plugin;
50

    
51
        /* IN_BAND  measurments added to hook executionlists */
52
        if(m->flags & IN_BAND) {
53
                /* TXLOC */
54
                if(m->flags & TXLOC) {
55
                        if(m->flags & PACKET) {
56
                                if(m->flags & REMOTE)
57
                                        dispatcherList[h_dst]->el_tx_pkt_remote[mp->getId()] = m;
58
                                else
59
                                        dispatcherList[h_dst]->el_tx_pkt_local[mp->getId()] = m;
60
                        }
61
                        if(m->flags & DATA) {
62
                                if(m->flags & REMOTE)
63
                                        dispatcherList[h_dst]->el_tx_data_remote[mp->getId()] = m;
64
                                else
65
                                        dispatcherList[h_dst]->el_tx_data_local[mp->getId()] = m;
66
                        }
67
                }
68
                /* RXLOC */
69
                if(m->flags & RXLOC) {
70
                        if(m->flags & PACKET)  {
71
                                if(m->flags & REMOTE)
72
                                        dispatcherList[h_dst]->el_rx_pkt_remote[mp->getId()] = m;
73
                                else
74
                                        dispatcherList[h_dst]->el_rx_pkt_local[mp->getId()] = m;
75
                        }
76
                        if(m->flags & DATA) {
77
                                if(m->flags & REMOTE)
78
                                        dispatcherList[h_dst]->el_rx_data_remote[mp->getId()] = m;
79
                                else
80
                                        dispatcherList[h_dst]->el_rx_data_local[mp->getId()] = m;
81
                        }
82
                }
83
        }
84
}
85

    
86
void MeasureDispatcher::delMeasureFromExecLists(MonMeasure *m) {
87
        SocketIdMt h_dst;
88
        class MeasurePlugin *mp = m->measure_plugin;
89
        ExecutionList::iterator it;
90

    
91
        h_dst.sid = (SocketId) m->dst_socketid;
92
        h_dst.mt = m->msg_type;
93

    
94
        /* IN_BAND  measurments added to hook executionlists */
95
        if(m->flags & IN_BAND) {
96
                /* TXLOC */
97
                if(m->flags & TXLOC) {
98
                        if(m->flags & PACKET) {
99
                                if(m->flags & REMOTE) {
100
                                        it = dispatcherList[h_dst]->el_tx_pkt_remote.find(mp->getId());
101
                                        if(it != dispatcherList[h_dst]->el_tx_pkt_remote.end())
102
                                                dispatcherList[h_dst]->el_tx_pkt_remote.erase(it);
103
                                } else {
104
                                        it = dispatcherList[h_dst]->el_tx_pkt_local.find(mp->getId());
105
                                        if(it != dispatcherList[h_dst]->el_tx_pkt_local.end())
106
                                                dispatcherList[h_dst]->el_tx_pkt_local.erase(it);
107
                                }
108
                        }
109
                        if(m->flags & DATA) {
110
                                if(m->flags & REMOTE) {
111
                                        it = dispatcherList[h_dst]->el_tx_data_remote.find(mp->getId());
112
                                        if(it != dispatcherList[h_dst]->el_tx_data_remote.end())
113
                                                dispatcherList[h_dst]->el_tx_data_remote.erase(it);
114
                                } else {
115
                                        it = dispatcherList[h_dst]->el_tx_data_local.find(mp->getId());
116
                                        if(it != dispatcherList[h_dst]->el_tx_data_local.end())
117
                                                dispatcherList[h_dst]->el_tx_data_local.erase(it);
118
                                }
119
                        }
120
                }
121
                /* RXLOC */
122
                if(m->flags & RXLOC) {
123
                        if(m->flags & PACKET) {
124
                                if(m->flags & REMOTE) {
125
                                        it = dispatcherList[h_dst]->el_rx_pkt_remote.find(mp->getId());
126
                                        if(it != dispatcherList[h_dst]->el_rx_pkt_remote.end())
127
                                                dispatcherList[h_dst]->el_rx_pkt_remote.erase(it);
128
                                } else {
129
                                        it = dispatcherList[h_dst]->el_rx_pkt_local.find(mp->getId());
130
                                        if(it != dispatcherList[h_dst]->el_rx_pkt_local.end())
131
                                                dispatcherList[h_dst]->el_rx_pkt_local.erase(it);
132
                                }
133
                        }
134
                        if(m->flags & DATA) {
135
                                if(m->flags & REMOTE) {
136
                                        it = dispatcherList[h_dst]->el_rx_data_remote.find(mp->getId());
137
                                        if(it != dispatcherList[h_dst]->el_rx_data_remote.end())
138
                                                dispatcherList[h_dst]->el_rx_data_remote.erase(it);
139
                                } else {
140
                                        it = dispatcherList[h_dst]->el_rx_data_local.find(mp->getId());
141
                                        if(it != dispatcherList[h_dst]->el_rx_data_local.end())
142
                                                dispatcherList[h_dst]->el_rx_data_local.erase(it);
143
                                }
144
                        }
145
                }
146
        }
147
}
148

    
149
int MeasureDispatcher::sendCtrlMsg(SocketId dst, Buffer &buffer)
150
{
151
        int con_id,res;
152
        Message *msg;
153
        send_params sparam = {0,0,0,0,0};
154
        struct MonHeader *mheader = (MonHeader*)&buffer[0];
155
        mheader->length = buffer.size();
156

    
157
        if(dst == NULL)
158
                return EOK;
159

    
160
        // if connection exists, send data
161
        con_id = mlConnectionExist(dst, false);
162

    
163
        if(con_id >= 0) {
164
                if(mlGetConnectionStatus(con_id) == 1) {
165
                        mlSendData(con_id,&buffer[0],buffer.size(),MSG_TYPE_MONL,NULL);
166
                        return EOK;
167
                }
168
        }
169

    
170
        //otherwise open connection and delay sending data
171
        msg = new Message;
172
        msg->length = buffer.size();
173
        msg->msg = new char[msg->length];
174
        memcpy(msg->msg, &buffer[0], buffer.size());
175
        memcpy(msg->sid, dst, SOCKETID_SIZE);
176
        msg->mt = MSG_TYPE_MONL;
177
        msg->sparam = sparam;
178
        con_id = mlOpenConnection(dst,&open_con_cb,msg, sparam);
179

    
180
        if(con_id < 0) {
181
                delete msg->msg;
182
                delete msg;
183
                return -ENOMEM;
184
        }
185

    
186
        return EOK;
187
}
188

    
189

    
190
int MeasureDispatcher::receiveCtrlMsg(SocketId sid, MsgType mt, char *cbuf, int length)
191
{
192
        struct MonHeader *mheader = (MonHeader*) cbuf;
193

    
194
        if(length < mheader->length)
195
                return EINVAL;
196

    
197
        cbuf += MONHDR_SIZE;
198
        length -= MONHDR_SIZE;
199

    
200
        switch (mheader->type) {
201
        case INITREMOTEMEASURE:
202
                        return initRemoteMeasureRx(sid, mt, cbuf);
203
        case REMOTEMEASURERESPONSE:
204
                        return remoteMeasureResponseRx(sid, mt, cbuf);
205
        case DEINITREMOTEMEASURE:
206
                        return deinitRemoteMeasureRx(sid, mt, cbuf);
207
        case REMOTERESULTS:
208
                        return remoteResultsRx(sid, mt, cbuf);
209
        case OOBDATA:
210
                        return oobDataRx(sid, mt, cbuf, length);
211
        }
212
        return EINVAL;
213
}
214

    
215
void MeasureDispatcher::headerSetup(eControlId type, Buffer &msg) {
216
        struct MonHeader mheader;
217
        mheader.length = 0;
218
        mheader.type = type;
219
        msg.insert(msg.end(), (char*) &mheader, ((char*) &mheader) + MONHDR_SIZE );
220
}
221

    
222
int MeasureDispatcher::oobDataTx(class MonMeasure *m, char *buf, int buf_len) {
223
        Buffer buffer;
224
        struct OobData oobdata;
225

    
226
        buffer.reserve(MONHDR_SIZE + OOBDATA_SIZE + buf_len);
227

    
228
        headerSetup(OOBDATA, buffer);
229

    
230
        oobdata.mh_local = m->mh_local;
231
        oobdata.mh_remote = m->mh_remote;
232

    
233
        buffer.insert(buffer.end(), (char*)&oobdata, ((char*)&oobdata) + OOBDATA_SIZE);
234

    
235
        buffer.insert(buffer.end(), buf, buf + buf_len);
236

    
237
        return sendCtrlMsg((SocketId) m->dst_socketid, buffer);
238
}
239

    
240
int MeasureDispatcher::oobDataRx(SocketId sid, MsgType mt, char *cbuf, int length) {
241
        struct SocketIdMt h_dst;
242
        h_dst.sid = sid;
243
        h_dst.mt = MSG_TYPE_MONL;
244

    
245
        struct OobData *oobdata = (struct OobData*) cbuf;
246
        result *r = NULL;
247

    
248
        if(mm->isValidMonHandler(oobdata->mh_remote))
249
                if(mm->mMeasureInstances[oobdata->mh_remote]->status == RUNNING &&
250
                                 mlCompareSocketIDs((SocketId) mm->mMeasureInstances[oobdata->mh_remote]->dst_socketid, sid) == 0) {
251
                        if(mm->mMeasureInstances[oobdata->mh_remote]->flags & DATA) {
252
                                if(mm->mMeasureInstances[oobdata->mh_remote]->flags & REMOTE)
253
                                        r = dispatcherList[h_dst]->data_r_rx_remote;
254
                                else
255
                                        r = dispatcherList[h_dst]->data_r_rx_local;
256
                        } else if (mm->mMeasureInstances[oobdata->mh_remote]->flags & PACKET) {
257
                                if(mm->mMeasureInstances[oobdata->mh_remote]->flags & REMOTE)
258
                                        r = dispatcherList[h_dst]->pkt_r_rx_remote;
259
                                else
260
                                        r = dispatcherList[h_dst]->pkt_r_rx_local;
261
                        }
262
                        if(!r)
263
                                fatal("MONL: r is not set");
264
                        mm->mMeasureInstances[oobdata->mh_remote]->receiveOobData(cbuf + OOBDATA_SIZE,
265
                                        length - OOBDATA_SIZE, r);
266
                }
267
        return EOK;
268
}
269

    
270
int MeasureDispatcher::remoteResultsTx(SocketId dst, struct res_mh_pair *rmp, int length, MsgType mt) {
271
        Buffer buffer;
272
        struct RemoteResults rresults;
273

    
274
        buffer.reserve(MONHDR_SIZE + REMOTERESULTS_SIZE + length * sizeof(struct res_mh_pair));
275
        
276
        headerSetup(REMOTERESULTS, buffer);
277

    
278
        rresults.length = length;
279
        rresults.msg_type = mt;
280
        
281
        buffer.insert(buffer.end(), (char*)&rresults, ((char*)&rresults) + REMOTERESULTS_SIZE);
282

    
283
        buffer.insert(buffer.end(), (char*) rmp, ((char*) rmp) + length * sizeof(struct res_mh_pair));
284

    
285
        return sendCtrlMsg(dst, buffer);
286
}
287

    
288
int MeasureDispatcher::remoteResultsRx(SocketId sid, MsgType mt, char *cbuf) {
289
        struct RemoteResults *rresults = (struct RemoteResults *) cbuf;
290
        struct res_mh_pair *rmp;
291

    
292
        rmp = (struct res_mh_pair *) (cbuf + REMOTERESULTS_SIZE);
293

    
294
        for(int i = 0; i < rresults->length; i++) {
295
                if(mm->isValidMonHandler(rmp[i].mh)) {
296
                        if(mm->mMeasureInstances[rmp[i].mh]->status == RUNNING &&
297
                         mm->mMeasureInstances[rmp[i].mh]->msg_type == rresults->msg_type &&
298
                          mlCompareSocketIDs((SocketId) mm->mMeasureInstances[rmp[i].mh]->dst_socketid, sid) == 0) {
299
                                if(mm->mMeasureInstances[rmp[i].mh]->rb != NULL) {
300
                                                mm->mMeasureInstances[rmp[i].mh]->rb->newSample(rmp[i].res);
301
                                }
302
                        }
303
                }
304
        }
305
        return EOK;
306
};
307

    
308
int MeasureDispatcher::remoteMeasureResponseTx(SocketId dst, MonHandler mhr, MonHandler mh, int32_t cid, int32_t status) {
309
        Buffer buffer;
310
        struct MeasureResponse mresponse;
311

    
312
        buffer.reserve(MONHDR_SIZE + MRESPONSE_SIZE);
313

    
314
        headerSetup(REMOTEMEASURERESPONSE, buffer);
315

    
316
        mresponse.mh_remote        = mhr;
317
        mresponse.mh_local        = mh;
318
        mresponse.command                = cid;
319
        mresponse.status                = status;
320

    
321
        buffer.insert(buffer.end(), (char*)&mresponse, ((char*)&mresponse) + MRESPONSE_SIZE);
322

    
323
        return sendCtrlMsg(dst, buffer);
324
}
325

    
326
int MeasureDispatcher::remoteMeasureResponseRx(SocketId src, MsgType mt, char* cbuf) {
327
        struct MeasureResponse *mresponse = (MeasureResponse*) cbuf;
328

    
329
        if(! mm->isValidMonHandler(mresponse->mh_remote)) {        //might not exist if the measure had been deleted in the meantime
330
                return -EINVAL;
331
        }
332

    
333
        MonMeasure *m = mm->mMeasureInstances[mresponse->mh_remote];
334
        switch(mresponse->command) {
335
                case INITREMOTEMEASURE:
336
                        m->mh_remote = mresponse->mh_local;
337
                        if(mresponse->status == EOK) {
338
                                if(m->flags & OUT_OF_BAND) {
339
                                        struct timeval tv = {0,0};
340
                                        if(m->scheduleNextIn(&tv) != EOK)
341
                                                m->status = FAILED;
342
                                        else
343
                                                m->status = RUNNING;
344
                                } else
345
                                        m->status = RUNNING;
346
                        } else
347
                                m->status = RFAILED;
348
                        break;
349
                case DEINITREMOTEMEASURE:
350
                        if(mresponse->status == EOK) {
351
                                if(m->status == STOPPING) {
352
                                        stopMeasure(m);
353
                                        if(mm->monDestroyMeasure(m->mh_local) != EOK)
354
                                                m->status = FAILED;
355
                                } else
356
                                        m->status = STOPPED;
357
                        }
358
                        else
359
                                m->status = RFAILED;
360
                        break;
361
                }
362
        return EOK;
363
}
364

    
365
int MeasureDispatcher::initRemoteMeasureRx(SocketId src, MsgType mt, char *cbuf) {
366
        MonHandler mh;
367
        MonParameterValue *param_vector;
368
        MonMeasure *m = NULL;
369
        struct SocketIdMt h_dst;
370
        struct InitRemoteMeasure *initmeasure = (InitRemoteMeasure*) cbuf;
371

    
372
        h_dst.sid = src;
373
        h_dst.mt = initmeasure->msg_type;
374

    
375
        /* Check if a previous instance is running */
376
        if(dispatcherList.find(h_dst) != dispatcherList.end())
377
                m = findMeasureFromId(dispatcherList[h_dst], initmeasure->mc, initmeasure->mid);
378

    
379
        //if so remove it, so that we can create a new one (needed if mc changed)
380
        if(m != NULL)
381
                mm->monDestroyMeasure(m->mh_local);
382

    
383
        mh = mm->monCreateMeasureId(initmeasure->mid, initmeasure->mc);
384

    
385
        if(mh < 0)
386
                return remoteMeasureResponseTx(src, initmeasure->mh_local, -1, INITREMOTEMEASURE, EINVAL);
387

    
388
        mm->mMeasureInstances[mh]->mh_remote = initmeasure->mh_local;
389

    
390
        param_vector = (MonParameterValue*)(cbuf + INITREMOTEMEASURE_SIZE);
391

    
392
        for(int i=0; i < initmeasure->n_params; i++) {
393
                if(mm->monSetParameter(mh,i,param_vector[i]) != EOK)        //TODO: init might be called inside this, which might use paramerets (e.g. src) that are set only below, in activateMeasure
394
                        goto error;
395
        }
396

    
397
        if(activateMeasure(mm->mMeasureInstances[mh], src, initmeasure->msg_type) != EOK)
398
                goto error;
399

    
400
        return remoteMeasureResponseTx(src, initmeasure->mh_local, mh, INITREMOTEMEASURE, EOK);
401

    
402
error:
403
        return remoteMeasureResponseTx(src, initmeasure->mh_local, mh, INITREMOTEMEASURE, EINVAL);
404
}
405

    
406
int MeasureDispatcher::initRemoteMeasureTx(class MonMeasure *m, SocketId dst, MsgType mt) {
407
        Buffer buffer;
408

    
409
        struct InitRemoteMeasure initmeasure;
410

    
411
        buffer.reserve(MONHDR_SIZE + INITREMOTEMEASURE_SIZE + sizeof(MonParameterValue) * m->measure_plugin->params.size());
412

    
413
        headerSetup(INITREMOTEMEASURE, buffer);
414

    
415
        initmeasure.mid = m->measure_plugin->getId();
416
        /* setup flags for REMOTE party */
417
        initmeasure.mc = (m->getFlags() | REMOTE) & ~(TXLOC | RXLOC | TXREM | RXREM);
418
        if(m->getFlags() & TXREM)
419
                initmeasure.mc |= TXLOC;
420
        if(m->getFlags() & RXREM)
421
                initmeasure.mc |= RXLOC;
422

    
423
        initmeasure.mh_local = m->mh_local;
424
        initmeasure.msg_type = mt;
425
        initmeasure.n_params = m->measure_plugin->params.size();
426

    
427
        buffer.insert(buffer.end(),(char*)&initmeasure,((char*)&initmeasure) + INITREMOTEMEASURE_SIZE);
428

    
429
        for(int i=0; i < initmeasure.n_params; i++)
430
                buffer.insert(buffer.end(), (char*)&(m->param_values[i]), ((char*)&(m->param_values[i])) + sizeof(MonParameterValue));
431

    
432
        return sendCtrlMsg(dst, buffer);
433
}
434

    
435
int MeasureDispatcher::deinitRemoteMeasureTx(class MonMeasure *m) {
436
        Buffer buffer;
437
        struct DeInitRemoteMeasure deinitmeasure;
438

    
439
        buffer.reserve(MONHDR_SIZE + DEINITREMOTEMEASURE_SIZE);
440

    
441
        headerSetup(DEINITREMOTEMEASURE, buffer);
442

    
443
        deinitmeasure.mh_local = m->mh_local;
444
        deinitmeasure.mh_remote = m->mh_remote;
445

    
446
        buffer.insert(buffer.end(),(char*)&deinitmeasure, ((char*)&deinitmeasure) + DEINITREMOTEMEASURE_SIZE);
447

    
448
        return sendCtrlMsg((SocketId) m->dst_socketid, buffer);
449
};
450

    
451
int MeasureDispatcher::deinitRemoteMeasureRx(SocketId src, MsgType mt, char* cbuf) {
452
        int ret;
453

    
454
        struct DeInitRemoteMeasure *deinitmeasure = (DeInitRemoteMeasure*) cbuf;
455

    
456
        if(! mm->isValidMonHandler(deinitmeasure->mh_remote)) {        //might not exist if the measure had been deleted in the meantime
457
                return -EINVAL;
458
        }
459

    
460
        ret = deactivateMeasure(mm->mMeasureInstances[deinitmeasure->mh_remote]);
461
        if (ret != EOK )
462
                return remoteMeasureResponseTx(src, deinitmeasure->mh_local, deinitmeasure->mh_remote, DEINITREMOTEMEASURE, ret);
463

    
464
        ret = mm->monDestroyMeasure(deinitmeasure->mh_remote);        //TODO: this mh should not be reused! Packets might still arrive referencig this
465
        if (ret != EOK )
466
                return remoteMeasureResponseTx(src, deinitmeasure->mh_local, deinitmeasure->mh_remote, DEINITREMOTEMEASURE, ret);
467

    
468
        return remoteMeasureResponseTx(src,  deinitmeasure->mh_local, deinitmeasure->mh_remote, DEINITREMOTEMEASURE, EOK);
469
}
470

    
471
class MonMeasure* MeasureDispatcher::findMeasureFromId(DestinationSocketIdMtData *dd,  MeasurementCapabilities flags, MeasurementId mid) {
472
        ExecutionList::iterator it;
473

    
474
        //check if loaded
475
        if(flags & REMOTE) {
476
                it = dd->mids_remote.find(mid);
477
                if(it == dd->mids_remote.end())
478
                        return NULL;
479
        } else {
480
                it = dd->mids_local.find(mid);
481
                if(it == dd->mids_local.end())
482
                        return NULL;
483
        }
484

    
485
        return it->second;
486
}
487

    
488
int MeasureDispatcher::activateMeasure(class MonMeasure *m, SocketId dst, MsgType mt, int auto_load) {
489
        MeasurementId mid;
490
        struct SocketIdMt h_dst;
491
        int ret;
492

    
493
        h_dst.sid = dst;
494
        h_dst.mt = mt;
495

    
496
        mid = m->measure_plugin->getId();
497
        m->msg_type = mt;
498

    
499
        if(dst != NULL) {
500
                memcpy(m->dst_socketid, (uint8_t *) dst, SOCKETID_SIZE);
501
        } else {
502
                if(m->flags != 0)
503
                        return -EINVAL;
504
        }
505

    
506
        m->defaultInit();
507

    
508
        if(m->flags == 0) {
509
                return EOK;
510
        }
511

    
512
        //check if already present
513
        if(dispatcherList.find(h_dst) != dispatcherList.end()) {
514
                if(m->flags & REMOTE) {
515
                        if(dispatcherList[h_dst]->mids_remote.find(mid) != dispatcherList[h_dst]->mids_remote.end())
516
                                return -EEXISTS;
517
                } else {
518
                        if(dispatcherList[h_dst]->mids_local.find(mid) != dispatcherList[h_dst]->mids_local.end())
519
                                return -EEXISTS;
520
                }
521
        }
522

    
523

    
524
        if(dispatcherList.find(h_dst) == dispatcherList.end()) {
525
                createDestinationSocketIdMtData(h_dst);
526
        }
527

    
528
        if(m->flags & OUT_OF_BAND) {
529
                struct SocketIdMt h_dst2 = h_dst;
530
                h_dst2.mt = MSG_TYPE_MONL;
531
                if(dispatcherList.find(h_dst2) == dispatcherList.end())
532
                        createDestinationSocketIdMtData(h_dst2);
533
        }
534

    
535
        if(m->flags & PACKET) {
536
                if(m->flags & REMOTE) {
537
                        m->r_rx_list = dispatcherList[h_dst]->pkt_r_rx_remote;
538
                        m->r_tx_list = dispatcherList[h_dst]->pkt_r_tx_remote;
539
                } else {
540
                        m->r_rx_list = dispatcherList[h_dst]->pkt_r_rx_local;
541
                        m->r_tx_list = dispatcherList[h_dst]->pkt_r_tx_local;
542
                }
543
        } else {
544
                if(m->flags & REMOTE) {
545
                        m->r_rx_list = dispatcherList[h_dst]->data_r_rx_remote;
546
                        m->r_tx_list = dispatcherList[h_dst]->data_r_tx_remote;
547
                } else {
548
                        m->r_rx_list = dispatcherList[h_dst]->data_r_rx_local;
549
                        m->r_tx_list = dispatcherList[h_dst]->data_r_tx_local;
550
                }
551
        }
552
        //TODO: check deps
553

    
554
        // add it to loaded measure list
555
        if(m->flags & REMOTE)
556
                dispatcherList[h_dst]->mids_remote[mid] = m;
557
        else
558
                dispatcherList[h_dst]->mids_local[mid] = m;
559

    
560
        //Handle IN_BAND measures
561
        addMeasureToExecLists(h_dst, m);
562

    
563
        //Handle OUT_OF_BAND measures
564
        if(m->flags & OUT_OF_BAND) {
565
                struct timeval tv = {0,0};
566
                //if only local
567
                if(!(m->flags & TXREM) && !(m->flags & RXREM))
568
                        m->scheduleNextIn(&tv); //start it!
569
        }
570

    
571
        /* Remote counterparts ? */
572
        if(m->flags & TXREM || m->flags & RXREM) {
573
                /* Yes, initialise them */
574
                m->status = INITIALISING;
575
                ret = initRemoteMeasureTx(m,dst,mt);
576
                if(ret != EOK)
577
                        goto error;
578
        } else {
579
                m->status = RUNNING;
580
        }
581

    
582
        if(m->dst_socketid == NULL)
583
                info("bla1");
584
        return EOK;
585

    
586
error:
587
        if(m->dst_socketid == NULL)
588
                info("bla2");
589

    
590
        stopMeasure(m);
591
        if(m->dst_socketid == NULL)
592
                info("bla3");
593

    
594
        return ret;
595
}
596

    
597
void MeasureDispatcher::createDestinationSocketIdMtData(struct SocketIdMt h_dst) {
598
        DestinationSocketIdMtData *dd;
599
        
600
        dd = new DestinationSocketIdMtData;
601

    
602
        memcpy(dd->sid, (uint8_t*) h_dst.sid, SOCKETID_SIZE);
603
        dd->h_dst.sid = (SocketId) &(dd->sid);
604
        dd->h_dst.mt = h_dst.mt;
605

    
606
        dispatcherList[dd->h_dst] = dd;
607

    
608
        //initialize
609
        for(int i = 0; i < R_LAST_PKT; i++) {
610
                dispatcherList[dd->h_dst]->pkt_r_rx_local[i] = NAN;
611
                dispatcherList[dd->h_dst]->pkt_r_rx_remote[i] = NAN;
612
                dispatcherList[dd->h_dst]->pkt_r_tx_local[i] = NAN;
613
                dispatcherList[dd->h_dst]->pkt_r_tx_remote[i] = NAN;
614
        }
615
        dispatcherList[dd->h_dst]->pkt_tx_seq_num = 0;
616

    
617
        for(int i = 0; i < R_LAST_DATA; i++) {
618
                dispatcherList[dd->h_dst]->data_r_rx_local[i] = NAN;
619
                dispatcherList[dd->h_dst]->data_r_rx_remote[i] = NAN;
620
                dispatcherList[dd->h_dst]->data_r_tx_local[i] = NAN;
621
                dispatcherList[dd->h_dst]->data_r_tx_remote[i] = NAN;
622
        }
623
        dispatcherList[dd->h_dst]->data_tx_seq_num = 0;
624
}
625

    
626
void MeasureDispatcher::destroyDestinationSocketIdMtData(SocketId dst, MsgType mt) {
627
        DestinationSocketIdMtData *dd;
628
        DispatcherListSocketIdMt::iterator it;
629
        struct SocketIdMt h_dst;
630

    
631
        h_dst.sid = dst;
632
        h_dst.mt = mt;
633

    
634
        it = dispatcherList.find(h_dst);
635
        if(it != dispatcherList.end()) {
636
                dd = it->second;
637
                dispatcherList.erase(it);
638
                delete dd;
639
        }
640
        else
641
                fatal("Trying to erase non existent DestinationSocketIdMtData");
642
}
643

    
644
int MeasureDispatcher::deactivateMeasure(class MonMeasure *m) {
645
        SocketIdMt h_dst;
646

    
647
        h_dst.sid = (SocketId) m->dst_socketid;
648
        h_dst.mt = m->msg_type;
649

    
650
        if(m->status == STOPPED)
651
                return EOK;
652

    
653
        if(m->used_counter > 0)
654
                return -EINUSE;
655

    
656
        if(m->flags == 0) {
657
                m->defaultStop();
658
                m->status = STOPPED;
659
                return EOK;
660
        }
661

    
662
        if(dispatcherList.find(h_dst) == dispatcherList.end())
663
                return EINVAL;
664

    
665
        //remove it from the loaded list
666
        if(m->flags & REMOTE) {
667
                if(dispatcherList[h_dst]->mids_remote.find(m->measure_plugin->getId()) == dispatcherList[h_dst]->mids_remote.end())
668
                        return -EINVAL;
669
        } else {
670
                if(dispatcherList[h_dst]->mids_local.find(m->measure_plugin->getId()) == dispatcherList[h_dst]->mids_local.end())
671
                        return -EINVAL;
672
        }
673

    
674
        return stopMeasure(m);
675
}
676

    
677
int MeasureDispatcher::stopMeasure(class MonMeasure *m) {
678
        SocketIdMt h_dst;
679

    
680
        h_dst.sid = (SocketId) m->dst_socketid;
681
        h_dst.mt = m->msg_type;
682

    
683
        m->defaultStop();
684

    
685
        //Handle IN_BAND measures
686
        delMeasureFromExecLists(m);
687
        //Handle OUT_OF:BAND measures
688
        //TODO: Anything to do at all?
689

    
690
        //Handle remote counterpart
691
        if(m->flags & TXREM || m->flags & RXREM) {
692
                        m->status = STOPPING;
693
                        deinitRemoteMeasureTx(m);
694
        } else
695
                m->status = STOPPED;
696
        m->r_tx_list = m->r_rx_list = NULL;
697

    
698
        //remove it from the loaded list
699
        if(m->flags & REMOTE)
700
                dispatcherList[h_dst]->mids_remote.erase(dispatcherList[h_dst]->mids_remote.find(m->measure_plugin->getId()));
701
        else
702
                dispatcherList[h_dst]->mids_local.erase(dispatcherList[h_dst]->mids_local.find(m->measure_plugin->getId()));
703

    
704
        //check if we can remove also the DistinationSocektIdMtData
705
        if(dispatcherList[h_dst]->mids_remote.empty() && dispatcherList[h_dst]->mids_local.empty())
706
                destroyDestinationSocketIdMtData((SocketId) m->dst_socketid, m->msg_type);
707
}
708

    
709

    
710
void MeasureDispatcher::cbRxPkt(void *arg) {
711
        ExecutionList::iterator it;
712
        result *r_loc, *r_rem;
713
        struct res_mh_pair rmp[R_LAST_PKT];
714
        int i,j;
715
        mon_pkt_inf *pkt_info = (mon_pkt_inf *)arg;
716
        struct MonPacketHeader *mph = (MonPacketHeader*) pkt_info->monitoringHeader;
717
        struct SocketIdMt h_dst;
718
        h_dst.sid = pkt_info->remote_socketID;
719
        h_dst.mt = pkt_info->msgtype;
720

    
721
        if(pkt_info->remote_socketID == NULL)
722
                return;
723

    
724
        /* something to do? */
725
        if(dispatcherList.size() == 0)
726
                return;
727

    
728
        if(dispatcherList.find(h_dst) == dispatcherList.end())
729
                return;
730

    
731
        //we have a result vector to fill
732
        r_loc = dispatcherList[h_dst]->pkt_r_rx_local;
733
        r_rem = dispatcherList[h_dst]->pkt_r_rx_remote;
734

    
735
        //TODO: add standard fields
736
        if(mph != NULL) {
737
                r_rem[R_SEQNUM] = r_loc[R_SEQNUM] = ntohl(mph->seq_num);
738
                r_rem[R_INITIAL_TTL] = r_loc[R_INITIAL_TTL] = mph->initial_ttl;
739
                r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] = ntohl(mph->ts_usec) / 1000000.0;
740
                r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] = r_loc[R_SEND_TIME] + ntohl(mph->ts_sec);
741
                r_rem[R_REPLY_TIME] = r_loc[R_REPLY_TIME] = ntohl(mph->ans_ts_usec) / 1000000.0;
742
                r_rem[R_REPLY_TIME] = r_loc[R_REPLY_TIME] = r_loc[R_REPLY_TIME] + ntohl(mph->ans_ts_sec);
743
        }
744

    
745
        r_rem[R_SIZE] = r_loc[R_SIZE] = pkt_info->bufSize;
746
        r_rem[R_TTL] = r_loc[R_TTL] = pkt_info->ttl;
747
        r_rem[R_DATA_ID] = r_loc[R_DATA_ID] = pkt_info->dataID;
748
        r_rem[R_DATA_OFFSET] = r_loc[R_DATA_OFFSET] = pkt_info->offset;
749
        r_rem[R_RECEIVE_TIME] = r_loc[R_RECEIVE_TIME] = pkt_info->arrival_time.tv_usec / 1000000.0;
750
        r_rem[R_RECEIVE_TIME] = r_loc[R_RECEIVE_TIME] += pkt_info->arrival_time.tv_sec;
751

    
752

    
753
        // are there local in band measures?
754
        if (dispatcherList[h_dst]->el_rx_pkt_local.size() > 0) {
755
                /* yes! */
756
                ExecutionList *el_ptr_loc = &(dispatcherList[h_dst]->el_rx_pkt_local);
757
        
758
                /* Call measures in order */
759
                for( it = el_ptr_loc->begin(); it != el_ptr_loc->end(); it++)
760
                        if(it->second->status == RUNNING)
761
                                it->second->RxPktLocal(el_ptr_loc);
762
        }
763

    
764
        if (dispatcherList[h_dst]->el_rx_pkt_remote.size() > 0) {
765
                /* And remotes */
766
                ExecutionList *el_ptr_rem = &(dispatcherList[h_dst]->el_rx_pkt_remote);
767
        
768
                /* Call measurees in order */
769
                j = 0;
770
                for( it = el_ptr_rem->begin(); it != el_ptr_rem->end(); it++)
771
                        if(it->second->status == RUNNING)
772
                                it->second->RxPktRemote(rmp,j,el_ptr_rem);
773
        
774
                /* send back results */
775
                if(j > 0)
776
                remoteResultsTx(h_dst.sid, rmp, j, h_dst.mt);
777
        }
778
}
779

    
780
void MeasureDispatcher::cbRxData(void *arg) {
781
        ExecutionList::iterator it;
782
        result *r_loc, *r_rem;
783
        struct res_mh_pair rmp[R_LAST_DATA];
784
        int i,j;
785
        mon_data_inf *data_info = (mon_data_inf *)arg;
786
        struct MonDataHeader *mdh = (MonDataHeader*) data_info->monitoringDataHeader;
787
        struct SocketIdMt h_dst;
788
        h_dst.sid = data_info->remote_socketID;
789
        h_dst.mt = data_info->msgtype;
790

    
791
        if(data_info->remote_socketID == NULL)
792
                return;
793

    
794
        /* something to do? */
795
        if(dispatcherList.size() == 0)
796
                return;
797

    
798
        if(dispatcherList.find(h_dst) == dispatcherList.end())
799
                return;
800

    
801
        //we have a result vector to fill
802
        r_loc = dispatcherList[h_dst]->data_r_rx_local;
803
        r_rem = dispatcherList[h_dst]->data_r_rx_remote;
804

    
805
        //TODO: add standard fields
806
        if(mdh != NULL) {
807
                r_rem[R_SEQNUM] = r_loc[R_SEQNUM] = ntohl(mdh->seq_num);
808
                r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] = ntohl(mdh->ts_usec) / 1000000.0;
809
                r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] = r_loc[R_SEND_TIME] + ntohl(mdh->ts_sec);
810
                r_rem[R_REPLY_TIME] = r_loc[R_REPLY_TIME] = ntohl(mdh->ans_ts_usec) / 1000000.0;
811
                r_rem[R_REPLY_TIME] = r_loc[R_REPLY_TIME] = r_loc[R_REPLY_TIME] + ntohl(mdh->ans_ts_sec);
812
        }
813
        r_rem[R_SIZE] = r_loc[R_SIZE] = data_info->bufSize;
814
        r_rem[R_RECEIVE_TIME] = r_loc[R_RECEIVE_TIME] = data_info->arrival_time.tv_usec / 1000000.0;
815
        r_rem[R_RECEIVE_TIME] = r_loc[R_RECEIVE_TIME] += data_info->arrival_time.tv_sec;
816

    
817
        if (dispatcherList[h_dst]->el_rx_data_local.size() > 0) {
818
                /* yes! */
819
                /* Locals first */
820
                ExecutionList *el_ptr_loc = &(dispatcherList[h_dst]->el_rx_data_local);
821
        
822
        
823
                /* Call measurees in order */
824
                for( it = el_ptr_loc->begin(); it != el_ptr_loc->end(); it++){
825
                        MonMeasure *m = it->second;
826
                        if(it->second->status == RUNNING)
827
                                it->second->RxDataLocal(el_ptr_loc);
828
                }
829
        }
830

    
831
        if (dispatcherList[h_dst]->el_rx_data_remote.size() > 0) {
832
                /* And remotes */
833
        
834
                ExecutionList *el_ptr_rem = &(dispatcherList[h_dst]->el_rx_data_remote);
835
        
836
                /* Call measurees in order */
837
                j = 0;
838
                for( it = el_ptr_rem->begin(); it != el_ptr_rem->end(); it++)
839
                        if(it->second->status == RUNNING)
840
                                it->second->RxDataRemote(rmp,j,el_ptr_rem);
841
        
842
                /* send back results */
843
                if(j > 0)
844
                        remoteResultsTx(h_dst.sid, rmp, j, h_dst.mt);
845
        }
846
}
847

    
848
void MeasureDispatcher::cbTxPkt(void *arg) {
849
        ExecutionList::iterator it;
850
        result *r_loc, *r_rem;
851
        struct res_mh_pair rmp[R_LAST_PKT];
852
        int i,j;
853
        mon_pkt_inf *pkt_info = (mon_pkt_inf *)arg;
854
        struct MonPacketHeader *mph = (MonPacketHeader*) pkt_info->monitoringHeader;
855
        struct timeval ts;
856
        struct SocketIdMt h_dst;
857
        h_dst.sid = pkt_info->remote_socketID;
858
        h_dst.mt = pkt_info->msgtype;
859

    
860
        if(pkt_info->remote_socketID == NULL)
861
                return;
862

    
863
        /* something to do? */
864
        if(dispatcherList.size() == 0)
865
                return;
866

    
867
        if(dispatcherList.find(h_dst) == dispatcherList.end())
868
                return;
869

    
870
        /* yes! */
871

    
872
        r_loc = dispatcherList[h_dst]->pkt_r_tx_local;
873
        r_rem = dispatcherList[h_dst]->pkt_r_tx_remote;
874

    
875
        /* prepare initial result vector (based on header information) */
876
                
877
        r_rem[R_SEQNUM] = r_loc[R_SEQNUM] = ++(dispatcherList[h_dst]->pkt_tx_seq_num);
878
        r_rem[R_SIZE] = r_loc[R_SIZE] = pkt_info->bufSize;
879
        r_rem[R_INITIAL_TTL] = r_loc[R_INITIAL_TTL] = initial_ttl;
880
        gettimeofday(&ts,NULL);        
881
        r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] = ts.tv_usec / 1000000.0;
882
        r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] =  r_loc[R_SEND_TIME] + ts.tv_sec;
883

    
884
        if(dispatcherList[h_dst]->el_tx_pkt_local.size() > 0) {
885

    
886
                ExecutionList *el_ptr_loc = &(dispatcherList[h_dst]->el_tx_pkt_local);
887

    
888
                /* Call measurees in order */
889
                for( it = el_ptr_loc->begin(); it != el_ptr_loc->end(); it++)
890
                        if(it->second->status == RUNNING)
891
                                it->second->TxPktLocal(el_ptr_loc);
892
        }
893

    
894
        if(dispatcherList[h_dst]->el_tx_pkt_remote.size() > 0) {
895
                /* And remotes */
896
                ExecutionList *el_ptr_rem = &(dispatcherList[h_dst]->el_tx_pkt_remote);
897
        
898
                /* Call measurees in order */
899
                j = 0;
900
                for( it = el_ptr_rem->begin(); it != el_ptr_rem->end(); it++)
901
                        if(it->second->status == RUNNING)
902
                                it->second->TxPktRemote(rmp,j,el_ptr_rem);
903
        
904
                /* send back results */
905
                if(j > 0)
906
                        remoteResultsTx(h_dst.sid, rmp, j, h_dst.mt);
907
        }
908

    
909
        if(mph != NULL) {
910
                mph->seq_num = htonl((uint32_t)r_rem[R_SEQNUM]);
911
                mph->initial_ttl = (uint32_t)r_rem[R_INITIAL_TTL];
912
                mph->ts_sec = htonl((uint32_t)floor(r_rem[R_SEND_TIME]));
913
                mph->ts_usec = htonl((uint32_t)((r_rem[R_SEND_TIME] - floor(r_rem[R_SEND_TIME])) * 1000000.0));
914
                if(!isnan(r_rem[R_REPLY_TIME])) {
915
                        mph->ans_ts_sec = htonl((uint32_t)floor(r_rem[R_REPLY_TIME]));
916
                        mph->ans_ts_usec = htonl((uint32_t)((r_rem[R_REPLY_TIME] - floor(r_rem[R_REPLY_TIME])) * 1000000.0));
917
                } else {
918
                        mph->ans_ts_sec = 0;
919
                        mph->ans_ts_usec = 0;
920
                }
921
        }
922
}
923

    
924
void MeasureDispatcher::cbTxData(void *arg) {
925
        ExecutionList::iterator it;
926
        result *r_loc,*r_rem;
927
        struct res_mh_pair rmp[R_LAST_DATA];
928
        int i,j;
929
        mon_data_inf *data_info = (mon_data_inf *)arg;
930
        struct MonDataHeader *mdh = (MonDataHeader*) data_info->monitoringDataHeader;
931
        struct timeval ts;
932
        struct SocketIdMt h_dst;
933
        h_dst.sid = data_info->remote_socketID;
934
        h_dst.mt = data_info->msgtype;
935

    
936
        if(data_info->remote_socketID == NULL)
937
                return;
938

    
939
        /* something to do? */
940
        if(dispatcherList.size() == 0)
941
                return;
942

    
943
        if(dispatcherList.find(h_dst) == dispatcherList.end())
944
                return;
945
        
946
        /* yes! */
947
        r_loc = dispatcherList[h_dst]->data_r_tx_local;
948
        r_rem = dispatcherList[h_dst]->data_r_tx_remote;
949

    
950
        
951
        //TODO add fields
952
        r_rem[R_SIZE] = r_loc[R_SIZE] = data_info->bufSize;
953
        r_rem[R_SEQNUM] = r_loc[R_SEQNUM] = ++(dispatcherList[h_dst]->data_tx_seq_num);
954
        gettimeofday(&ts,NULL);
955
        r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] = ts.tv_usec / 1000000.0;
956
        r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] = r_loc[R_SEND_TIME] + ts.tv_sec;
957

    
958
        if(dispatcherList[h_dst]->el_tx_data_local.size() > 0) {
959
                ExecutionList *el_ptr_loc = &(dispatcherList[h_dst]->el_tx_data_local);
960

    
961
                /* Call measures in order */
962
                for( it = el_ptr_loc->begin(); it != el_ptr_loc->end(); it++)
963
                        if(it->second->status == RUNNING)
964
                                it->second->TxDataLocal(el_ptr_loc);
965
        }
966

    
967
        if(dispatcherList[h_dst]->el_tx_data_remote.size() > 0) {
968
                /* And remote */
969
                ExecutionList *el_ptr_rem = &(dispatcherList[h_dst]->el_tx_data_remote);
970
        
971
                /* Call measures in order */
972
                j = 0;
973
                for( it = el_ptr_rem->begin(); it != el_ptr_rem->end(); it++)
974
                        if(it->second->status == RUNNING)
975
                                it->second->TxDataRemote(rmp, j, el_ptr_rem);
976
        
977
                /* send back results */
978
                if(j > 0)
979
                        remoteResultsTx(h_dst.sid, rmp, j, h_dst.mt);
980
        }
981

    
982
        if(mdh != NULL) {
983
                mdh->seq_num =  htonl((uint32_t)r_rem[R_SEQNUM]);
984
                mdh->ts_sec =  htonl((uint32_t)floor(r_rem[R_SEND_TIME]));
985
                mdh->ts_usec =  htonl((uint32_t)((r_rem[R_SEND_TIME] - floor(r_rem[R_SEND_TIME])) * 1000000.0));
986
                if(!isnan(r_rem[R_REPLY_TIME])) {
987
                        mdh->ans_ts_sec = htonl((uint32_t)floor(r_rem[R_REPLY_TIME]));
988
                        mdh->ans_ts_usec = htonl((uint32_t)((r_rem[R_REPLY_TIME] - floor(r_rem[R_REPLY_TIME])) * 1000000.0));
989
                } else {
990
                        mdh->ans_ts_sec = 0;
991
                        mdh->ans_ts_usec = 0;
992
                }
993
        }
994
}
995

    
996
int MeasureDispatcher::cbHdrPkt(SocketId sid, MsgType mt) {
997
        struct SocketIdMt h_dst;
998
        h_dst.sid = sid;
999
        h_dst.mt = mt;
1000

    
1001
        if(sid == NULL)
1002
                return 0;
1003

    
1004
        /* something to do? */
1005
        if(dispatcherList.size() == 0)
1006
                return 0;
1007

    
1008
        if(dispatcherList.find(h_dst) == dispatcherList.end())
1009
                return 0;
1010

    
1011
        /* yes! */
1012
        return MON_PACKET_HEADER_SIZE;
1013
}
1014

    
1015
int MeasureDispatcher::cbHdrData(SocketId sid, MsgType mt) {
1016
        struct SocketIdMt h_dst;
1017
        h_dst.sid = sid;
1018
        h_dst.mt = mt;
1019

    
1020
        if(sid == NULL)
1021
                return 0;
1022

    
1023
        /* something to do? */
1024
        if(dispatcherList.size() == 0)
1025
                return 0;
1026

    
1027
        if(dispatcherList.find(h_dst) == dispatcherList.end())
1028
                return 0;
1029

    
1030
        /* yes! return the space we need */
1031
        return MON_DATA_HEADER_SIZE;
1032
}
1033

    
1034
int MeasureDispatcher::scheduleMeasure(MonHandler mh) {
1035
        if(mm->isValidMonHandler(mh))
1036
                mm->mMeasureInstances[mh]->Run();
1037
        return EOK;
1038
}
1039

    
1040
int MeasureDispatcher::schedulePublish(MonHandler mh) {
1041
        if(mm->isValidMonHandler(mh))
1042
                if(mm->mMeasureInstances[mh]->rb != NULL)
1043
                        mm->mMeasureInstances[mh]->rb->publishResults();
1044
        return EOK;
1045
}