Statistics
| Branch: | Revision:

napa-baselibs / monl / measure_dispatcher.cpp @ 5f3adef4

History | View | Annotate | Download (30.7 KB)

1
/***************************************************************************
2
 *   Copyright (C) 2009 by Robert Birke
3
 *   robert.birke@polito.it
4
 *
5
 * This library is free software; you can redistribute it and/or
6
 * modify it under the terms of the GNU Lesser General Public
7
 * License as published by the Free Software Foundation; either
8
 * version 2.1 of the License, or (at your option) any later version.
9

10
 * This library is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13
 * Lesser General Public License for more details.
14

15
 * You should have received a copy of the GNU Lesser General Public
16
 * License along with this library; if not, write to the Free Software
17
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA 
18
 ***********************************************************************/
19

    
20
#include "measure_dispatcher.h"
21
#include "measure_manager.h"
22
#include "errors.h"
23
#include "ctrl_msg.h"
24
#include "grapes_log.h"
25

    
26

    
27
#include <arpa/inet.h>
28
#include <string.h>
29
#include <sstream>
30
#include <stdint.h>
31
#include <sys/time.h>
32

    
33
//TODO: possible improvements:
34
// - make buffer a ptr and don't make an explicit copy
35
// - implement a better hash and compare for SocketId
36
// - use activate/deactivate for automatic loading/unloading (done has to be checked)
37

    
38

    
39
void open_con_cb (int con_id, void *arg) {
40
        Message *msg = (Message*) arg;
41

    
42
        mlSendData(con_id,msg->msg,msg->length,msg->mt,NULL);
43

    
44
        delete[] msg->msg;
45
        delete msg;
46
}
47

    
48
void MeasureDispatcher::addMeasureToExecLists(SocketIdMt h_dst, class MonMeasure *m) {
49
        class MeasurePlugin *mp = m->measure_plugin;
50

    
51
        /* IN_BAND  measurments added to hook executionlists */
52
        if(m->flags & IN_BAND) {
53
                /* TXLOC */
54
                if(m->flags & TXLOC) {
55
                        if(m->flags & PACKET) {
56
                                if(m->flags & REMOTE)
57
                                        dispatcherList[h_dst]->el_tx_pkt_remote[mp->getId()] = m;
58
                                else
59
                                        dispatcherList[h_dst]->el_tx_pkt_local[mp->getId()] = m;
60
                        }
61
                        if(m->flags & DATA) {
62
                                if(m->flags & REMOTE)
63
                                        dispatcherList[h_dst]->el_tx_data_remote[mp->getId()] = m;
64
                                else
65
                                        dispatcherList[h_dst]->el_tx_data_local[mp->getId()] = m;
66
                        }
67
                }
68
                /* RXLOC */
69
                if(m->flags & RXLOC) {
70
                        if(m->flags & PACKET)  {
71
                                if(m->flags & REMOTE)
72
                                        dispatcherList[h_dst]->el_rx_pkt_remote[mp->getId()] = m;
73
                                else
74
                                        dispatcherList[h_dst]->el_rx_pkt_local[mp->getId()] = m;
75
                        }
76
                        if(m->flags & DATA) {
77
                                if(m->flags & REMOTE)
78
                                        dispatcherList[h_dst]->el_rx_data_remote[mp->getId()] = m;
79
                                else
80
                                        dispatcherList[h_dst]->el_rx_data_local[mp->getId()] = m;
81
                        }
82
                }
83
        }
84
}
85

    
86
void MeasureDispatcher::delMeasureFromExecLists(MonMeasure *m) {
87
        SocketIdMt h_dst;
88
        class MeasurePlugin *mp = m->measure_plugin;
89
        ExecutionList::iterator it;
90

    
91
        h_dst.sid = (SocketId) m->dst_socketid;
92
        h_dst.mt = m->msg_type;
93

    
94
        /* IN_BAND  measurments added to hook executionlists */
95
        if(m->flags & IN_BAND) {
96
                /* TXLOC */
97
                if(m->flags & TXLOC) {
98
                        if(m->flags & PACKET) {
99
                                if(m->flags & REMOTE) {
100
                                        it = dispatcherList[h_dst]->el_tx_pkt_remote.find(mp->getId());
101
                                        if(it != dispatcherList[h_dst]->el_tx_pkt_remote.end())
102
                                                dispatcherList[h_dst]->el_tx_pkt_remote.erase(it);
103
                                } else {
104
                                        it = dispatcherList[h_dst]->el_tx_pkt_local.find(mp->getId());
105
                                        if(it != dispatcherList[h_dst]->el_tx_pkt_local.end())
106
                                                dispatcherList[h_dst]->el_tx_pkt_local.erase(it);
107
                                }
108
                        }
109
                        if(m->flags & DATA) {
110
                                if(m->flags & REMOTE) {
111
                                        it = dispatcherList[h_dst]->el_tx_data_remote.find(mp->getId());
112
                                        if(it != dispatcherList[h_dst]->el_tx_data_remote.end())
113
                                                dispatcherList[h_dst]->el_tx_data_remote.erase(it);
114
                                } else {
115
                                        it = dispatcherList[h_dst]->el_tx_data_local.find(mp->getId());
116
                                        if(it != dispatcherList[h_dst]->el_tx_data_local.end())
117
                                                dispatcherList[h_dst]->el_tx_data_local.erase(it);
118
                                }
119
                        }
120
                }
121
                /* RXLOC */
122
                if(m->flags & RXLOC) {
123
                        if(m->flags & PACKET) {
124
                                if(m->flags & REMOTE) {
125
                                        it = dispatcherList[h_dst]->el_rx_pkt_remote.find(mp->getId());
126
                                        if(it != dispatcherList[h_dst]->el_rx_pkt_remote.end())
127
                                                dispatcherList[h_dst]->el_rx_pkt_remote.erase(it);
128
                                } else {
129
                                        it = dispatcherList[h_dst]->el_rx_pkt_local.find(mp->getId());
130
                                        if(it != dispatcherList[h_dst]->el_rx_pkt_local.end())
131
                                                dispatcherList[h_dst]->el_rx_pkt_local.erase(it);
132
                                }
133
                        }
134
                        if(m->flags & DATA) {
135
                                if(m->flags & REMOTE) {
136
                                        it = dispatcherList[h_dst]->el_rx_data_remote.find(mp->getId());
137
                                        if(it != dispatcherList[h_dst]->el_rx_data_remote.end())
138
                                                dispatcherList[h_dst]->el_rx_data_remote.erase(it);
139
                                } else {
140
                                        it = dispatcherList[h_dst]->el_rx_data_local.find(mp->getId());
141
                                        if(it != dispatcherList[h_dst]->el_rx_data_local.end())
142
                                                dispatcherList[h_dst]->el_rx_data_local.erase(it);
143
                                }
144
                        }
145
                }
146
        }
147
}
148

    
149
int MeasureDispatcher::sendCtrlMsg(SocketId dst, Buffer &buffer)
150
{
151
        int con_id,res;
152
        Message *msg;
153
        send_params sparam = {0,0,0,0,0};
154
        struct MonHeader *mheader = (MonHeader*)&buffer[0];
155
        mheader->length = buffer.size();
156

    
157
        if(dst == NULL)
158
                return EOK;
159

    
160
        // if connection exists, send data
161
        con_id = mlConnectionExist(dst, false);
162

    
163
        if(con_id >= 0) {
164
                if(mlGetConnectionStatus(con_id) == 1) {
165
                        mlSendData(con_id,&buffer[0],buffer.size(),MSG_TYPE_MONL,NULL);
166
                        return EOK;
167
                }
168
        }
169

    
170
        //otherwise open connection and delay sending data
171
        msg = new Message;
172
        msg->length = buffer.size();
173
        msg->msg = new char[msg->length];
174
        memcpy(msg->msg, &buffer[0], buffer.size());
175
        memcpy(msg->sid, dst, SOCKETID_SIZE);
176
        msg->mt = MSG_TYPE_MONL;
177
        msg->sparam = sparam;
178
        con_id = mlOpenConnection(dst,&open_con_cb,msg, sparam);
179

    
180
        if(con_id < 0) {
181
                delete msg->msg;
182
                delete msg;
183
                return -ENOMEM;
184
        }
185

    
186
        return EOK;
187
}
188

    
189

    
190
int MeasureDispatcher::receiveCtrlMsg(SocketId sid, MsgType mt, char *cbuf, int length)
191
{
192
        struct MonHeader *mheader = (MonHeader*) cbuf;
193

    
194
        if(length < mheader->length)
195
                return EINVAL;
196

    
197
        cbuf += MONHDR_SIZE;
198
        length -= MONHDR_SIZE;
199

    
200
        switch (mheader->type) {
201
        case INITREMOTEMEASURE:
202
                        return initRemoteMeasureRx(sid, mt, cbuf);
203
        case REMOTEMEASURERESPONSE:
204
                        return remoteMeasureResponseRx(sid, mt, cbuf);
205
        case DEINITREMOTEMEASURE:
206
                        return deinitRemoteMeasureRx(sid, mt, cbuf);
207
        case REMOTERESULTS:
208
                        return remoteResultsRx(sid, mt, cbuf);
209
        case OOBDATA:
210
                        return oobDataRx(sid, mt, cbuf, length);
211
        }
212
        return EINVAL;
213
}
214

    
215
void MeasureDispatcher::headerSetup(eControlId type, Buffer &msg) {
216
        struct MonHeader mheader;
217
        mheader.length = 0;
218
        mheader.type = type;
219
        msg.insert(msg.end(), (char*) &mheader, ((char*) &mheader) + MONHDR_SIZE );
220
}
221

    
222
int MeasureDispatcher::oobDataTx(class MonMeasure *m, char *buf, int buf_len) {
223
        Buffer buffer;
224
        struct OobData oobdata;
225

    
226
        buffer.reserve(MONHDR_SIZE + OOBDATA_SIZE + buf_len);
227

    
228
        headerSetup(OOBDATA, buffer);
229

    
230
        oobdata.mh_local = m->mh_local;
231
        oobdata.mh_remote = m->mh_remote;
232

    
233
        buffer.insert(buffer.end(), (char*)&oobdata, ((char*)&oobdata) + OOBDATA_SIZE);
234

    
235
        buffer.insert(buffer.end(), buf, buf + buf_len);
236

    
237
        return sendCtrlMsg((SocketId) m->dst_socketid, buffer);
238
}
239

    
240
int MeasureDispatcher::oobDataRx(SocketId sid, MsgType mt, char *cbuf, int length) {
241
        struct SocketIdMt h_dst;
242
        h_dst.sid = sid;
243
        h_dst.mt = MSG_TYPE_MONL;
244

    
245
        struct OobData *oobdata = (struct OobData*) cbuf;
246
        result *r = NULL;
247

    
248
        if(mm->isValidMonHandler(oobdata->mh_remote))
249
                if(mm->mMeasureInstances[oobdata->mh_remote]->status == RUNNING &&
250
                                 mlCompareSocketIDs((SocketId) mm->mMeasureInstances[oobdata->mh_remote]->dst_socketid, sid) == 0) {
251
                        if(mm->mMeasureInstances[oobdata->mh_remote]->flags & DATA) {
252
                                if(mm->mMeasureInstances[oobdata->mh_remote]->flags & REMOTE)
253
                                        r = dispatcherList[h_dst]->data_r_rx_remote;
254
                                else
255
                                        r = dispatcherList[h_dst]->data_r_rx_local;
256
                        } else if (mm->mMeasureInstances[oobdata->mh_remote]->flags & PACKET) {
257
                                if(mm->mMeasureInstances[oobdata->mh_remote]->flags & REMOTE)
258
                                        r = dispatcherList[h_dst]->pkt_r_rx_remote;
259
                                else
260
                                        r = dispatcherList[h_dst]->pkt_r_rx_local;
261
                        }
262
                        if(!r)
263
                                fatal("MONL: r is not set");
264
                        mm->mMeasureInstances[oobdata->mh_remote]->receiveOobData(cbuf + OOBDATA_SIZE,
265
                                        length - OOBDATA_SIZE, r);
266
                }
267
        return EOK;
268
}
269

    
270
int MeasureDispatcher::remoteResultsTx(SocketId dst, struct res_mh_pair *rmp, int length, MsgType mt) {
271
        Buffer buffer;
272
        struct RemoteResults rresults;
273

    
274
        buffer.reserve(MONHDR_SIZE + REMOTERESULTS_SIZE + length * sizeof(struct res_mh_pair));
275
        
276
        headerSetup(REMOTERESULTS, buffer);
277

    
278
        rresults.length = length;
279
        rresults.msg_type = mt;
280
        
281
        buffer.insert(buffer.end(), (char*)&rresults, ((char*)&rresults) + REMOTERESULTS_SIZE);
282

    
283
        buffer.insert(buffer.end(), (char*) rmp, ((char*) rmp) + length * sizeof(struct res_mh_pair));
284

    
285
        return sendCtrlMsg(dst, buffer);
286
}
287

    
288
int MeasureDispatcher::remoteResultsRx(SocketId sid, MsgType mt, char *cbuf) {
289
        struct RemoteResults *rresults = (struct RemoteResults *) cbuf;
290
        struct res_mh_pair *rmp;
291

    
292
        rmp = (struct res_mh_pair *) (cbuf + REMOTERESULTS_SIZE);
293

    
294
        for(int i = 0; i < rresults->length; i++) {
295
                if(mm->isValidMonHandler(rmp[i].mh)) {
296
                        if(mm->mMeasureInstances[rmp[i].mh]->status == RUNNING &&
297
                         mm->mMeasureInstances[rmp[i].mh]->msg_type == rresults->msg_type &&
298
                          mlCompareSocketIDs((SocketId) mm->mMeasureInstances[rmp[i].mh]->dst_socketid, sid) == 0) {
299
                                if(mm->mMeasureInstances[rmp[i].mh]->rb != NULL) {
300
                                                mm->mMeasureInstances[rmp[i].mh]->rb->newSample(rmp[i].res);
301
                                }
302
                        }
303
                }
304
        }
305
        return EOK;
306
};
307

    
308
int MeasureDispatcher::remoteMeasureResponseTx(SocketId dst, MonHandler mhr, MonHandler mh, int32_t cid, int32_t status) {
309
        Buffer buffer;
310
        struct MeasureResponse mresponse;
311

    
312
        buffer.reserve(MONHDR_SIZE + MRESPONSE_SIZE);
313

    
314
        headerSetup(REMOTEMEASURERESPONSE, buffer);
315

    
316
        mresponse.mh_remote        = mhr;
317
        mresponse.mh_local        = mh;
318
        mresponse.command                = cid;
319
        mresponse.status                = status;
320

    
321
        buffer.insert(buffer.end(), (char*)&mresponse, ((char*)&mresponse) + MRESPONSE_SIZE);
322

    
323
        return sendCtrlMsg(dst, buffer);
324
}
325

    
326
int MeasureDispatcher::remoteMeasureResponseRx(SocketId src, MsgType mt, char* cbuf) {
327
        struct MeasureResponse *mresponse = (MeasureResponse*) cbuf;
328

    
329
        if(! mm->isValidMonHandler(mresponse->mh_remote)) {        //might not exist if the measure had been deleted in the meantime
330
                return -EINVAL;
331
        }
332

    
333
        MonMeasure *m = mm->mMeasureInstances[mresponse->mh_remote];
334
        switch(mresponse->command) {
335
                case INITREMOTEMEASURE:
336
                        m->mh_remote = mresponse->mh_local;
337
                        if(mresponse->status == EOK) {
338
                                if(m->flags & OUT_OF_BAND) {
339
                                        struct timeval tv = {0,0};
340
                                        if(m->scheduleNextIn(&tv) != EOK)
341
                                                m->status = FAILED;
342
                                        else
343
                                                m->status = RUNNING;
344
                                } else
345
                                        m->status = RUNNING;
346
                        } else
347
                                m->status = RFAILED;
348
                        break;
349
                case DEINITREMOTEMEASURE:
350
                        if(mresponse->status == EOK) {
351
                                if(m->status == STOPPING) {
352
                                        stopMeasure(m);
353
                                        if(mm->monDestroyMeasure(m->mh_local) != EOK)
354
                                                m->status = FAILED;
355
                                } else
356
                                        m->status = STOPPED;
357
                        }
358
                        else
359
                                m->status = RFAILED;
360
                        break;
361
                }
362
        return EOK;
363
}
364

    
365
int MeasureDispatcher::initRemoteMeasureRx(SocketId src, MsgType mt, char *cbuf) {
366
        MonHandler mh;
367
        MonParameterValue *param_vector;
368
        MonMeasure *m = NULL;
369
        struct SocketIdMt h_dst;
370
        struct InitRemoteMeasure *initmeasure = (InitRemoteMeasure*) cbuf;
371

    
372
        h_dst.sid = src;
373
        h_dst.mt = initmeasure->msg_type;
374

    
375
        /* Check if a previous instance is running */
376
        if(dispatcherList.find(h_dst) != dispatcherList.end())
377
                m = findMeasureFromId(dispatcherList[h_dst], initmeasure->mc, initmeasure->mid);
378

    
379
        //if so remove it, so that we can create a new one (needed if mc changed)
380
        if(m != NULL)
381
                mm->monDestroyMeasure(m->mh_local);
382

    
383
        mh = mm->monCreateMeasureId(initmeasure->mid, initmeasure->mc);
384

    
385
        if(mh < 0)
386
                return remoteMeasureResponseTx(src, initmeasure->mh_local, -1, INITREMOTEMEASURE, EINVAL);
387

    
388
        mm->mMeasureInstances[mh]->mh_remote = initmeasure->mh_local;
389

    
390
        param_vector = (MonParameterValue*)(cbuf + INITREMOTEMEASURE_SIZE);
391

    
392
        for(int i=0; i < initmeasure->n_params; i++) {
393
                if(mm->monSetParameter(mh,i,param_vector[i]) != EOK)        //TODO: init might be called inside this, which might use paramerets (e.g. src) that are set only below, in activateMeasure
394
                        goto error;
395
        }
396

    
397
        if(activateMeasure(mm->mMeasureInstances[mh], src, initmeasure->msg_type) != EOK)
398
                goto error;
399

    
400
        return remoteMeasureResponseTx(src, initmeasure->mh_local, mh, INITREMOTEMEASURE, EOK);
401

    
402
error:
403
        return remoteMeasureResponseTx(src, initmeasure->mh_local, mh, INITREMOTEMEASURE, EINVAL);
404
}
405

    
406
int MeasureDispatcher::initRemoteMeasureTx(class MonMeasure *m, SocketId dst, MsgType mt) {
407
        Buffer buffer;
408

    
409
        struct InitRemoteMeasure initmeasure;
410

    
411
        buffer.reserve(MONHDR_SIZE + INITREMOTEMEASURE_SIZE + sizeof(MonParameterValue) * m->measure_plugin->params.size());
412

    
413
        headerSetup(INITREMOTEMEASURE, buffer);
414

    
415
        initmeasure.mid = m->measure_plugin->getId();
416
        /* setup flags for REMOTE party */
417
        initmeasure.mc = (m->getFlags() | REMOTE) & ~(TXLOC | RXLOC | TXREM | RXREM);
418
        if(m->getFlags() & TXREM)
419
                initmeasure.mc |= TXLOC;
420
        if(m->getFlags() & RXREM)
421
                initmeasure.mc |= RXLOC;
422

    
423
        initmeasure.mh_local = m->mh_local;
424
        initmeasure.msg_type = mt;
425
        initmeasure.n_params = m->measure_plugin->params.size();
426

    
427
        buffer.insert(buffer.end(),(char*)&initmeasure,((char*)&initmeasure) + INITREMOTEMEASURE_SIZE);
428

    
429
        for(int i=0; i < initmeasure.n_params; i++)
430
                buffer.insert(buffer.end(), (char*)&(m->param_values[i]), ((char*)&(m->param_values[i])) + sizeof(MonParameterValue));
431

    
432
        return sendCtrlMsg(dst, buffer);
433
}
434

    
435
int MeasureDispatcher::deinitRemoteMeasureTx(class MonMeasure *m) {
436
        Buffer buffer;
437
        struct DeInitRemoteMeasure deinitmeasure;
438

    
439
        buffer.reserve(MONHDR_SIZE + DEINITREMOTEMEASURE_SIZE);
440

    
441
        headerSetup(DEINITREMOTEMEASURE, buffer);
442

    
443
        deinitmeasure.mh_local = m->mh_local;
444
        deinitmeasure.mh_remote = m->mh_remote;
445

    
446
        buffer.insert(buffer.end(),(char*)&deinitmeasure, ((char*)&deinitmeasure) + DEINITREMOTEMEASURE_SIZE);
447

    
448
        return sendCtrlMsg((SocketId) m->dst_socketid, buffer);
449
};
450

    
451
int MeasureDispatcher::deinitRemoteMeasureRx(SocketId src, MsgType mt, char* cbuf) {
452
        int ret;
453

    
454
        struct DeInitRemoteMeasure *deinitmeasure = (DeInitRemoteMeasure*) cbuf;
455

    
456
        if(! mm->isValidMonHandler(deinitmeasure->mh_remote)) {        //might not exist if the measure had been deleted in the meantime
457
                return -EINVAL;
458
        }
459

    
460
        ret = deactivateMeasure(mm->mMeasureInstances[deinitmeasure->mh_remote]);
461
        if (ret != EOK )
462
                return remoteMeasureResponseTx(src, deinitmeasure->mh_local, deinitmeasure->mh_remote, DEINITREMOTEMEASURE, ret);
463

    
464
        ret = mm->monDestroyMeasure(deinitmeasure->mh_remote);        //TODO: this mh should not be reused! Packets might still arrive referencig this
465
        if (ret != EOK )
466
                return remoteMeasureResponseTx(src, deinitmeasure->mh_local, deinitmeasure->mh_remote, DEINITREMOTEMEASURE, ret);
467

    
468
        return remoteMeasureResponseTx(src,  deinitmeasure->mh_local, deinitmeasure->mh_remote, DEINITREMOTEMEASURE, EOK);
469
}
470

    
471
class MonMeasure* MeasureDispatcher::findMeasureFromId(DestinationSocketIdMtData *dd,  MeasurementCapabilities flags, MeasurementId mid) {
472
        ExecutionList::iterator it;
473

    
474
        //check if loaded
475
        if(flags & REMOTE) {
476
                it = dd->mids_remote.find(mid);
477
                if(it == dd->mids_remote.end())
478
                        return NULL;
479
        } else {
480
                it = dd->mids_local.find(mid);
481
                if(it == dd->mids_local.end())
482
                        return NULL;
483
        }
484

    
485
        return it->second;
486
}
487

    
488
int MeasureDispatcher::activateMeasure(class MonMeasure *m, SocketId dst, MsgType mt, int auto_load) {
489
        MeasurementId mid;
490
        struct SocketIdMt h_dst;
491
        int ret;
492

    
493
        h_dst.sid = dst;
494
        h_dst.mt = mt;
495

    
496
        mid = m->measure_plugin->getId();
497
        m->msg_type = mt;
498

    
499
        if(dst != NULL) {
500
                memcpy(m->dst_socketid, (uint8_t *) dst, SOCKETID_SIZE);
501
                m->dst_socketid_publish = true;
502
        } else {
503
                if(m->flags != 0)
504
                        return -EINVAL;
505
                m->dst_socketid_publish = false;
506
        }
507

    
508
        m->defaultInit();
509

    
510
        if(m->flags == 0) {
511
                m->status = RUNNING;
512
                return EOK;
513
        }
514

    
515
        //check if already present
516
        if(dispatcherList.find(h_dst) != dispatcherList.end()) {
517
                if(m->flags & REMOTE) {
518
                        if(dispatcherList[h_dst]->mids_remote.find(mid) != dispatcherList[h_dst]->mids_remote.end())
519
                                return -EEXISTS;
520
                } else {
521
                        if(dispatcherList[h_dst]->mids_local.find(mid) != dispatcherList[h_dst]->mids_local.end())
522
                                return -EEXISTS;
523
                }
524
        }
525

    
526

    
527
        if(dispatcherList.find(h_dst) == dispatcherList.end()) {
528
                createDestinationSocketIdMtData(h_dst);
529
        }
530

    
531
        if(m->flags & OUT_OF_BAND) {
532
                struct SocketIdMt h_dst2 = h_dst;
533
                h_dst2.mt = MSG_TYPE_MONL;
534
                if(dispatcherList.find(h_dst2) == dispatcherList.end())
535
                        createDestinationSocketIdMtData(h_dst2);
536
        }
537

    
538
        if(m->flags & PACKET) {
539
                if(m->flags & REMOTE) {
540
                        m->r_rx_list = dispatcherList[h_dst]->pkt_r_rx_remote;
541
                        m->r_tx_list = dispatcherList[h_dst]->pkt_r_tx_remote;
542
                } else {
543
                        m->r_rx_list = dispatcherList[h_dst]->pkt_r_rx_local;
544
                        m->r_tx_list = dispatcherList[h_dst]->pkt_r_tx_local;
545
                }
546
        } else {
547
                if(m->flags & REMOTE) {
548
                        m->r_rx_list = dispatcherList[h_dst]->data_r_rx_remote;
549
                        m->r_tx_list = dispatcherList[h_dst]->data_r_tx_remote;
550
                } else {
551
                        m->r_rx_list = dispatcherList[h_dst]->data_r_rx_local;
552
                        m->r_tx_list = dispatcherList[h_dst]->data_r_tx_local;
553
                }
554
        }
555
        //TODO: check deps
556

    
557
        // add it to loaded measure list
558
        if(m->flags & REMOTE)
559
                dispatcherList[h_dst]->mids_remote[mid] = m;
560
        else
561
                dispatcherList[h_dst]->mids_local[mid] = m;
562

    
563
        //Handle IN_BAND measures
564
        addMeasureToExecLists(h_dst, m);
565

    
566
        //Handle OUT_OF_BAND measures
567
        if(m->flags & OUT_OF_BAND) {
568
                struct timeval tv = {0,0};
569
                //if only local
570
                if(!(m->flags & TXREM) && !(m->flags & RXREM))
571
                        m->scheduleNextIn(&tv); //start it!
572
        }
573

    
574
        /* Remote counterparts ? */
575
        if(m->flags & TXREM || m->flags & RXREM) {
576
                /* Yes, initialise them */
577
                m->status = INITIALISING;
578
                ret = initRemoteMeasureTx(m,dst,mt);
579
                if(ret != EOK)
580
                        goto error;
581
        } else {
582
                m->status = RUNNING;
583
        }
584

    
585
        if(m->dst_socketid == NULL)
586
                info("bla1");
587
        return EOK;
588

    
589
error:
590
        if(m->dst_socketid == NULL)
591
                info("bla2");
592

    
593
        stopMeasure(m);
594
        if(m->dst_socketid == NULL)
595
                info("bla3");
596

    
597
        return ret;
598
}
599

    
600
void MeasureDispatcher::createDestinationSocketIdMtData(struct SocketIdMt h_dst) {
601
        DestinationSocketIdMtData *dd;
602
        
603
        dd = new DestinationSocketIdMtData;
604

    
605
        memcpy(dd->sid, (uint8_t*) h_dst.sid, SOCKETID_SIZE);
606
        dd->h_dst.sid = (SocketId) &(dd->sid);
607
        dd->h_dst.mt = h_dst.mt;
608

    
609
        dispatcherList[dd->h_dst] = dd;
610

    
611
        //initialize
612
        for(int i = 0; i < R_LAST_PKT; i++) {
613
                dispatcherList[dd->h_dst]->pkt_r_rx_local[i] = NAN;
614
                dispatcherList[dd->h_dst]->pkt_r_rx_remote[i] = NAN;
615
                dispatcherList[dd->h_dst]->pkt_r_tx_local[i] = NAN;
616
                dispatcherList[dd->h_dst]->pkt_r_tx_remote[i] = NAN;
617
        }
618
        dispatcherList[dd->h_dst]->pkt_tx_seq_num = 0;
619

    
620
        for(int i = 0; i < R_LAST_DATA; i++) {
621
                dispatcherList[dd->h_dst]->data_r_rx_local[i] = NAN;
622
                dispatcherList[dd->h_dst]->data_r_rx_remote[i] = NAN;
623
                dispatcherList[dd->h_dst]->data_r_tx_local[i] = NAN;
624
                dispatcherList[dd->h_dst]->data_r_tx_remote[i] = NAN;
625
        }
626
        dispatcherList[dd->h_dst]->data_tx_seq_num = 0;
627
}
628

    
629
void MeasureDispatcher::destroyDestinationSocketIdMtData(SocketId dst, MsgType mt) {
630
        DestinationSocketIdMtData *dd;
631
        DispatcherListSocketIdMt::iterator it;
632
        struct SocketIdMt h_dst;
633

    
634
        h_dst.sid = dst;
635
        h_dst.mt = mt;
636

    
637
        it = dispatcherList.find(h_dst);
638
        if(it != dispatcherList.end()) {
639
                dd = it->second;
640
                dispatcherList.erase(it);
641
                delete dd;
642
        }
643
        else
644
                fatal("Trying to erase non existent DestinationSocketIdMtData");
645
}
646

    
647
int MeasureDispatcher::deactivateMeasure(class MonMeasure *m) {
648
        SocketIdMt h_dst;
649

    
650
        h_dst.sid = (SocketId) m->dst_socketid;
651
        h_dst.mt = m->msg_type;
652

    
653
        if(m->status == STOPPED)
654
                return EOK;
655

    
656
        if(m->used_counter > 0)
657
                return -EINUSE;
658

    
659
        if(m->flags == 0) {
660
                m->defaultStop();
661
                m->status = STOPPED;
662
                return EOK;
663
        }
664

    
665
        if(dispatcherList.find(h_dst) == dispatcherList.end())
666
                return EINVAL;
667

    
668
        //remove it from the loaded list
669
        if(m->flags & REMOTE) {
670
                if(dispatcherList[h_dst]->mids_remote.find(m->measure_plugin->getId()) == dispatcherList[h_dst]->mids_remote.end())
671
                        return -EINVAL;
672
        } else {
673
                if(dispatcherList[h_dst]->mids_local.find(m->measure_plugin->getId()) == dispatcherList[h_dst]->mids_local.end())
674
                        return -EINVAL;
675
        }
676

    
677
        return stopMeasure(m);
678
}
679

    
680
int MeasureDispatcher::stopMeasure(class MonMeasure *m) {
681
        SocketIdMt h_dst;
682

    
683
        h_dst.sid = (SocketId) m->dst_socketid;
684
        h_dst.mt = m->msg_type;
685

    
686
        m->defaultStop();
687

    
688
        //Handle IN_BAND measures
689
        delMeasureFromExecLists(m);
690
        //Handle OUT_OF:BAND measures
691
        //TODO: Anything to do at all?
692

    
693
        //Handle remote counterpart
694
        if(m->flags & TXREM || m->flags & RXREM) {
695
                        m->status = STOPPING;
696
                        deinitRemoteMeasureTx(m);
697
        } else
698
                m->status = STOPPED;
699
        m->r_tx_list = m->r_rx_list = NULL;
700

    
701
        //remove it from the loaded list
702
        if(m->flags & REMOTE)
703
                dispatcherList[h_dst]->mids_remote.erase(dispatcherList[h_dst]->mids_remote.find(m->measure_plugin->getId()));
704
        else
705
                dispatcherList[h_dst]->mids_local.erase(dispatcherList[h_dst]->mids_local.find(m->measure_plugin->getId()));
706

    
707
        //check if we can remove also the DistinationSocektIdMtData
708
        if(dispatcherList[h_dst]->mids_remote.empty() && dispatcherList[h_dst]->mids_local.empty())
709
                destroyDestinationSocketIdMtData((SocketId) m->dst_socketid, m->msg_type);
710
}
711

    
712

    
713
void MeasureDispatcher::cbRxPkt(void *arg) {
714
        ExecutionList::iterator it;
715
        result *r_loc, *r_rem;
716
        struct res_mh_pair rmp[R_LAST_PKT];
717
        int i,j;
718
        mon_pkt_inf *pkt_info = (mon_pkt_inf *)arg;
719
        struct MonPacketHeader *mph = (MonPacketHeader*) pkt_info->monitoringHeader;
720
        struct SocketIdMt h_dst;
721
        h_dst.sid = pkt_info->remote_socketID;
722
        h_dst.mt = pkt_info->msgtype;
723

    
724
        if(pkt_info->remote_socketID == NULL)
725
                return;
726

    
727
        /* something to do? */
728
        if(dispatcherList.size() == 0)
729
                return;
730

    
731
        if(dispatcherList.find(h_dst) == dispatcherList.end())
732
                return;
733

    
734
        //we have a result vector to fill
735
        r_loc = dispatcherList[h_dst]->pkt_r_rx_local;
736
        r_rem = dispatcherList[h_dst]->pkt_r_rx_remote;
737

    
738
        //TODO: add standard fields
739
        if(mph != NULL) {
740
                r_rem[R_SEQNUM] = r_loc[R_SEQNUM] = ntohl(mph->seq_num);
741
                r_rem[R_INITIAL_TTL] = r_loc[R_INITIAL_TTL] = mph->initial_ttl;
742
                r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] = ntohl(mph->ts_usec) / 1000000.0;
743
                r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] = r_loc[R_SEND_TIME] + ntohl(mph->ts_sec);
744
                r_rem[R_REPLY_TIME] = r_loc[R_REPLY_TIME] = ntohl(mph->ans_ts_usec) / 1000000.0;
745
                r_rem[R_REPLY_TIME] = r_loc[R_REPLY_TIME] = r_loc[R_REPLY_TIME] + ntohl(mph->ans_ts_sec);
746
        }
747

    
748
        r_rem[R_SIZE] = r_loc[R_SIZE] = pkt_info->bufSize;
749
        r_rem[R_TTL] = r_loc[R_TTL] = pkt_info->ttl;
750
        r_rem[R_DATA_ID] = r_loc[R_DATA_ID] = pkt_info->dataID;
751
        r_rem[R_DATA_OFFSET] = r_loc[R_DATA_OFFSET] = pkt_info->offset;
752
        r_rem[R_RECEIVE_TIME] = r_loc[R_RECEIVE_TIME] = pkt_info->arrival_time.tv_usec / 1000000.0;
753
        r_rem[R_RECEIVE_TIME] = r_loc[R_RECEIVE_TIME] += pkt_info->arrival_time.tv_sec;
754

    
755

    
756
        // are there local in band measures?
757
        if (dispatcherList[h_dst]->el_rx_pkt_local.size() > 0) {
758
                /* yes! */
759
                ExecutionList *el_ptr_loc = &(dispatcherList[h_dst]->el_rx_pkt_local);
760
        
761
                /* Call measures in order */
762
                for( it = el_ptr_loc->begin(); it != el_ptr_loc->end(); it++)
763
                        if(it->second->status == RUNNING)
764
                                it->second->RxPktLocal(el_ptr_loc);
765
        }
766

    
767
        if (dispatcherList[h_dst]->el_rx_pkt_remote.size() > 0) {
768
                /* And remotes */
769
                ExecutionList *el_ptr_rem = &(dispatcherList[h_dst]->el_rx_pkt_remote);
770
        
771
                /* Call measurees in order */
772
                j = 0;
773
                for( it = el_ptr_rem->begin(); it != el_ptr_rem->end(); it++)
774
                        if(it->second->status == RUNNING)
775
                                it->second->RxPktRemote(rmp,j,el_ptr_rem);
776
        
777
                /* send back results */
778
                if(j > 0)
779
                remoteResultsTx(h_dst.sid, rmp, j, h_dst.mt);
780
        }
781
}
782

    
783
void MeasureDispatcher::cbRxData(void *arg) {
784
        ExecutionList::iterator it;
785
        result *r_loc, *r_rem;
786
        struct res_mh_pair rmp[R_LAST_DATA];
787
        int i,j;
788
        mon_data_inf *data_info = (mon_data_inf *)arg;
789
        struct MonDataHeader *mdh = (MonDataHeader*) data_info->monitoringDataHeader;
790
        struct SocketIdMt h_dst;
791
        h_dst.sid = data_info->remote_socketID;
792
        h_dst.mt = data_info->msgtype;
793

    
794
        if(data_info->remote_socketID == NULL)
795
                return;
796

    
797
        /* something to do? */
798
        if(dispatcherList.size() == 0)
799
                return;
800

    
801
        if(dispatcherList.find(h_dst) == dispatcherList.end())
802
                return;
803

    
804
        //we have a result vector to fill
805
        r_loc = dispatcherList[h_dst]->data_r_rx_local;
806
        r_rem = dispatcherList[h_dst]->data_r_rx_remote;
807

    
808
        //TODO: add standard fields
809
        if(mdh != NULL) {
810
                r_rem[R_SEQNUM] = r_loc[R_SEQNUM] = ntohl(mdh->seq_num);
811
                r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] = ntohl(mdh->ts_usec) / 1000000.0;
812
                r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] = r_loc[R_SEND_TIME] + ntohl(mdh->ts_sec);
813
                r_rem[R_REPLY_TIME] = r_loc[R_REPLY_TIME] = ntohl(mdh->ans_ts_usec) / 1000000.0;
814
                r_rem[R_REPLY_TIME] = r_loc[R_REPLY_TIME] = r_loc[R_REPLY_TIME] + ntohl(mdh->ans_ts_sec);
815
        }
816
        r_rem[R_SIZE] = r_loc[R_SIZE] = data_info->bufSize;
817
        r_rem[R_RECEIVE_TIME] = r_loc[R_RECEIVE_TIME] = data_info->arrival_time.tv_usec / 1000000.0;
818
        r_rem[R_RECEIVE_TIME] = r_loc[R_RECEIVE_TIME] += data_info->arrival_time.tv_sec;
819

    
820
        if (dispatcherList[h_dst]->el_rx_data_local.size() > 0) {
821
                /* yes! */
822
                /* Locals first */
823
                ExecutionList *el_ptr_loc = &(dispatcherList[h_dst]->el_rx_data_local);
824
        
825
        
826
                /* Call measurees in order */
827
                for( it = el_ptr_loc->begin(); it != el_ptr_loc->end(); it++){
828
                        MonMeasure *m = it->second;
829
                        if(it->second->status == RUNNING)
830
                                it->second->RxDataLocal(el_ptr_loc);
831
                }
832
        }
833

    
834
        if (dispatcherList[h_dst]->el_rx_data_remote.size() > 0) {
835
                /* And remotes */
836
        
837
                ExecutionList *el_ptr_rem = &(dispatcherList[h_dst]->el_rx_data_remote);
838
        
839
                /* Call measurees in order */
840
                j = 0;
841
                for( it = el_ptr_rem->begin(); it != el_ptr_rem->end(); it++)
842
                        if(it->second->status == RUNNING)
843
                                it->second->RxDataRemote(rmp,j,el_ptr_rem);
844
        
845
                /* send back results */
846
                if(j > 0)
847
                        remoteResultsTx(h_dst.sid, rmp, j, h_dst.mt);
848
        }
849
}
850

    
851
void MeasureDispatcher::cbTxPkt(void *arg) {
852
        ExecutionList::iterator it;
853
        result *r_loc, *r_rem;
854
        struct res_mh_pair rmp[R_LAST_PKT];
855
        int i,j;
856
        mon_pkt_inf *pkt_info = (mon_pkt_inf *)arg;
857
        struct MonPacketHeader *mph = (MonPacketHeader*) pkt_info->monitoringHeader;
858
        struct timeval ts;
859
        struct SocketIdMt h_dst;
860
        h_dst.sid = pkt_info->remote_socketID;
861
        h_dst.mt = pkt_info->msgtype;
862

    
863
        if(pkt_info->remote_socketID == NULL)
864
                return;
865

    
866
        /* something to do? */
867
        if(dispatcherList.size() == 0)
868
                return;
869

    
870
        if(dispatcherList.find(h_dst) == dispatcherList.end())
871
                return;
872

    
873
        /* yes! */
874

    
875
        r_loc = dispatcherList[h_dst]->pkt_r_tx_local;
876
        r_rem = dispatcherList[h_dst]->pkt_r_tx_remote;
877

    
878
        /* prepare initial result vector (based on header information) */
879
                
880
        r_rem[R_SEQNUM] = r_loc[R_SEQNUM] = ++(dispatcherList[h_dst]->pkt_tx_seq_num);
881
        r_rem[R_SIZE] = r_loc[R_SIZE] = pkt_info->bufSize;
882
        r_rem[R_INITIAL_TTL] = r_loc[R_INITIAL_TTL] = initial_ttl;
883
        gettimeofday(&ts,NULL);        
884
        r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] = ts.tv_usec / 1000000.0;
885
        r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] =  r_loc[R_SEND_TIME] + ts.tv_sec;
886

    
887
        if(dispatcherList[h_dst]->el_tx_pkt_local.size() > 0) {
888

    
889
                ExecutionList *el_ptr_loc = &(dispatcherList[h_dst]->el_tx_pkt_local);
890

    
891
                /* Call measurees in order */
892
                for( it = el_ptr_loc->begin(); it != el_ptr_loc->end(); it++)
893
                        if(it->second->status == RUNNING)
894
                                it->second->TxPktLocal(el_ptr_loc);
895
        }
896

    
897
        if(dispatcherList[h_dst]->el_tx_pkt_remote.size() > 0) {
898
                /* And remotes */
899
                ExecutionList *el_ptr_rem = &(dispatcherList[h_dst]->el_tx_pkt_remote);
900
        
901
                /* Call measurees in order */
902
                j = 0;
903
                for( it = el_ptr_rem->begin(); it != el_ptr_rem->end(); it++)
904
                        if(it->second->status == RUNNING)
905
                                it->second->TxPktRemote(rmp,j,el_ptr_rem);
906
        
907
                /* send back results */
908
                if(j > 0)
909
                        remoteResultsTx(h_dst.sid, rmp, j, h_dst.mt);
910
        }
911

    
912
        if(mph != NULL) {
913
                mph->seq_num = htonl((uint32_t)r_rem[R_SEQNUM]);
914
                mph->initial_ttl = (uint32_t)r_rem[R_INITIAL_TTL];
915
                mph->ts_sec = htonl((uint32_t)floor(r_rem[R_SEND_TIME]));
916
                mph->ts_usec = htonl((uint32_t)((r_rem[R_SEND_TIME] - floor(r_rem[R_SEND_TIME])) * 1000000.0));
917
                if(!isnan(r_rem[R_REPLY_TIME])) {
918
                        mph->ans_ts_sec = htonl((uint32_t)floor(r_rem[R_REPLY_TIME]));
919
                        mph->ans_ts_usec = htonl((uint32_t)((r_rem[R_REPLY_TIME] - floor(r_rem[R_REPLY_TIME])) * 1000000.0));
920
                } else {
921
                        mph->ans_ts_sec = 0;
922
                        mph->ans_ts_usec = 0;
923
                }
924
        }
925
}
926

    
927
void MeasureDispatcher::cbTxData(void *arg) {
928
        ExecutionList::iterator it;
929
        result *r_loc,*r_rem;
930
        struct res_mh_pair rmp[R_LAST_DATA];
931
        int i,j;
932
        mon_data_inf *data_info = (mon_data_inf *)arg;
933
        struct MonDataHeader *mdh = (MonDataHeader*) data_info->monitoringDataHeader;
934
        struct timeval ts;
935
        struct SocketIdMt h_dst;
936
        h_dst.sid = data_info->remote_socketID;
937
        h_dst.mt = data_info->msgtype;
938

    
939
        if(data_info->remote_socketID == NULL)
940
                return;
941

    
942
        /* something to do? */
943
        if(dispatcherList.size() == 0)
944
                return;
945

    
946
        if(dispatcherList.find(h_dst) == dispatcherList.end())
947
                return;
948
        
949
        /* yes! */
950
        r_loc = dispatcherList[h_dst]->data_r_tx_local;
951
        r_rem = dispatcherList[h_dst]->data_r_tx_remote;
952

    
953
        
954
        //TODO add fields
955
        r_rem[R_SIZE] = r_loc[R_SIZE] = data_info->bufSize;
956
        r_rem[R_SEQNUM] = r_loc[R_SEQNUM] = ++(dispatcherList[h_dst]->data_tx_seq_num);
957
        gettimeofday(&ts,NULL);
958
        r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] = ts.tv_usec / 1000000.0;
959
        r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] = r_loc[R_SEND_TIME] + ts.tv_sec;
960

    
961
        if(dispatcherList[h_dst]->el_tx_data_local.size() > 0) {
962
                ExecutionList *el_ptr_loc = &(dispatcherList[h_dst]->el_tx_data_local);
963

    
964
                /* Call measures in order */
965
                for( it = el_ptr_loc->begin(); it != el_ptr_loc->end(); it++)
966
                        if(it->second->status == RUNNING)
967
                                it->second->TxDataLocal(el_ptr_loc);
968
        }
969

    
970
        if(dispatcherList[h_dst]->el_tx_data_remote.size() > 0) {
971
                /* And remote */
972
                ExecutionList *el_ptr_rem = &(dispatcherList[h_dst]->el_tx_data_remote);
973
        
974
                /* Call measures in order */
975
                j = 0;
976
                for( it = el_ptr_rem->begin(); it != el_ptr_rem->end(); it++)
977
                        if(it->second->status == RUNNING)
978
                                it->second->TxDataRemote(rmp, j, el_ptr_rem);
979
        
980
                /* send back results */
981
                if(j > 0)
982
                        remoteResultsTx(h_dst.sid, rmp, j, h_dst.mt);
983
        }
984

    
985
        if(mdh != NULL) {
986
                mdh->seq_num =  htonl((uint32_t)r_rem[R_SEQNUM]);
987
                mdh->ts_sec =  htonl((uint32_t)floor(r_rem[R_SEND_TIME]));
988
                mdh->ts_usec =  htonl((uint32_t)((r_rem[R_SEND_TIME] - floor(r_rem[R_SEND_TIME])) * 1000000.0));
989
                if(!isnan(r_rem[R_REPLY_TIME])) {
990
                        mdh->ans_ts_sec = htonl((uint32_t)floor(r_rem[R_REPLY_TIME]));
991
                        mdh->ans_ts_usec = htonl((uint32_t)((r_rem[R_REPLY_TIME] - floor(r_rem[R_REPLY_TIME])) * 1000000.0));
992
                } else {
993
                        mdh->ans_ts_sec = 0;
994
                        mdh->ans_ts_usec = 0;
995
                }
996
        }
997
}
998

    
999
int MeasureDispatcher::cbHdrPkt(SocketId sid, MsgType mt) {
1000
        struct SocketIdMt h_dst;
1001
        h_dst.sid = sid;
1002
        h_dst.mt = mt;
1003

    
1004
        if(sid == NULL)
1005
                return 0;
1006

    
1007
        /* something to do? */
1008
        if(dispatcherList.size() == 0)
1009
                return 0;
1010

    
1011
        if(dispatcherList.find(h_dst) == dispatcherList.end())
1012
                return 0;
1013

    
1014
        /* yes! */
1015
        return MON_PACKET_HEADER_SIZE;
1016
}
1017

    
1018
int MeasureDispatcher::cbHdrData(SocketId sid, MsgType mt) {
1019
        struct SocketIdMt h_dst;
1020
        h_dst.sid = sid;
1021
        h_dst.mt = mt;
1022

    
1023
        if(sid == NULL)
1024
                return 0;
1025

    
1026
        /* something to do? */
1027
        if(dispatcherList.size() == 0)
1028
                return 0;
1029

    
1030
        if(dispatcherList.find(h_dst) == dispatcherList.end())
1031
                return 0;
1032

    
1033
        /* yes! return the space we need */
1034
        return MON_DATA_HEADER_SIZE;
1035
}
1036

    
1037
int MeasureDispatcher::scheduleMeasure(MonHandler mh) {
1038
        if(mm->isValidMonHandler(mh))
1039
                mm->mMeasureInstances[mh]->Run();
1040
        return EOK;
1041
}
1042

    
1043
int MeasureDispatcher::schedulePublish(MonHandler mh) {
1044
        if(mm->isValidMonHandler(mh))
1045
                if(mm->mMeasureInstances[mh]->rb != NULL)
1046
                        mm->mMeasureInstances[mh]->rb->publishResults();
1047
        return EOK;
1048
}