Statistics
| Branch: | Revision:

napa-baselibs / monl / measure_dispatcher.cpp @ 507372bb

History | View | Annotate | Download (30.7 KB)

1
/***************************************************************************
2
 *   Copyright (C) 2009 by Robert Birke
3
 *   robert.birke@polito.it
4
 *
5
 * This library is free software; you can redistribute it and/or
6
 * modify it under the terms of the GNU Lesser General Public
7
 * License as published by the Free Software Foundation; either
8
 * version 2.1 of the License, or (at your option) any later version.
9

10
 * This library is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13
 * Lesser General Public License for more details.
14

15
 * You should have received a copy of the GNU Lesser General Public
16
 * License along with this library; if not, write to the Free Software
17
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA 
18
 ***********************************************************************/
19

    
20
#include "measure_dispatcher.h"
21
#include "measure_manager.h"
22
#include "errors.h"
23
#include "ctrl_msg.h"
24
#include "napa_log.h"
25

    
26

    
27
#ifndef WIN32
28
#include <arpa/inet.h>
29
#else
30
#include <winsock2.h>
31
#endif
32
#include <string.h>
33
#include <sstream>
34
#include <stdint.h>
35
#include <sys/time.h>
36

    
37
#include <cmath>
38

    
39
//TODO: possible improvements:
40
// - make buffer a ptr and don't make an explicit copy
41
// - implement a better hash and compare for SocketId
42
// - use activate/deactivate for automatic loading/unloading (done has to be checked)
43

    
44

    
45
void open_con_cb (int con_id, void *arg) {
46
        Message *msg = (Message*) arg;
47

    
48
        mlSendData(con_id,msg->msg,msg->length,msg->mt,NULL);
49

    
50
        delete[] msg->msg;
51
        delete msg;
52
}
53

    
54
void MeasureDispatcher::addMeasureToExecLists(SocketIdMt h_dst, class MonMeasure *m) {
55
        class MeasurePlugin *mp = m->measure_plugin;
56

    
57
        /* IN_BAND  measurments added to hook executionlists */
58
        if(m->flags & IN_BAND) {
59
                /* TXLOC */
60
                if(m->flags & TXLOC) {
61
                        if(m->flags & PACKET) {
62
                                if(m->flags & REMOTE)
63
                                        dispatcherList[h_dst]->el_tx_pkt_remote[mp->getId()] = m;
64
                                else
65
                                        dispatcherList[h_dst]->el_tx_pkt_local[mp->getId()] = m;
66
                        }
67
                        if(m->flags & DATA) {
68
                                if(m->flags & REMOTE)
69
                                        dispatcherList[h_dst]->el_tx_data_remote[mp->getId()] = m;
70
                                else
71
                                        dispatcherList[h_dst]->el_tx_data_local[mp->getId()] = m;
72
                        }
73
                }
74
                /* RXLOC */
75
                if(m->flags & RXLOC) {
76
                        if(m->flags & PACKET)  {
77
                                if(m->flags & REMOTE)
78
                                        dispatcherList[h_dst]->el_rx_pkt_remote[mp->getId()] = m;
79
                                else
80
                                        dispatcherList[h_dst]->el_rx_pkt_local[mp->getId()] = m;
81
                        }
82
                        if(m->flags & DATA) {
83
                                if(m->flags & REMOTE)
84
                                        dispatcherList[h_dst]->el_rx_data_remote[mp->getId()] = m;
85
                                else
86
                                        dispatcherList[h_dst]->el_rx_data_local[mp->getId()] = m;
87
                        }
88
                }
89
        }
90
}
91

    
92
void MeasureDispatcher::delMeasureFromExecLists(MonMeasure *m) {
93
        SocketIdMt h_dst;
94
        class MeasurePlugin *mp = m->measure_plugin;
95
        ExecutionList::iterator it;
96

    
97
        h_dst.sid = (SocketId) m->dst_socketid;
98
        h_dst.mt = m->msg_type;
99

    
100
        /* IN_BAND  measurments added to hook executionlists */
101
        if(m->flags & IN_BAND) {
102
                /* TXLOC */
103
                if(m->flags & TXLOC) {
104
                        if(m->flags & PACKET) {
105
                                if(m->flags & REMOTE) {
106
                                        it = dispatcherList[h_dst]->el_tx_pkt_remote.find(mp->getId());
107
                                        if(it != dispatcherList[h_dst]->el_tx_pkt_remote.end())
108
                                                dispatcherList[h_dst]->el_tx_pkt_remote.erase(it);
109
                                } else {
110
                                        it = dispatcherList[h_dst]->el_tx_pkt_local.find(mp->getId());
111
                                        if(it != dispatcherList[h_dst]->el_tx_pkt_local.end())
112
                                                dispatcherList[h_dst]->el_tx_pkt_local.erase(it);
113
                                }
114
                        }
115
                        if(m->flags & DATA) {
116
                                if(m->flags & REMOTE) {
117
                                        it = dispatcherList[h_dst]->el_tx_data_remote.find(mp->getId());
118
                                        if(it != dispatcherList[h_dst]->el_tx_data_remote.end())
119
                                                dispatcherList[h_dst]->el_tx_data_remote.erase(it);
120
                                } else {
121
                                        it = dispatcherList[h_dst]->el_tx_data_local.find(mp->getId());
122
                                        if(it != dispatcherList[h_dst]->el_tx_data_local.end())
123
                                                dispatcherList[h_dst]->el_tx_data_local.erase(it);
124
                                }
125
                        }
126
                }
127
                /* RXLOC */
128
                if(m->flags & RXLOC) {
129
                        if(m->flags & PACKET) {
130
                                if(m->flags & REMOTE) {
131
                                        it = dispatcherList[h_dst]->el_rx_pkt_remote.find(mp->getId());
132
                                        if(it != dispatcherList[h_dst]->el_rx_pkt_remote.end())
133
                                                dispatcherList[h_dst]->el_rx_pkt_remote.erase(it);
134
                                } else {
135
                                        it = dispatcherList[h_dst]->el_rx_pkt_local.find(mp->getId());
136
                                        if(it != dispatcherList[h_dst]->el_rx_pkt_local.end())
137
                                                dispatcherList[h_dst]->el_rx_pkt_local.erase(it);
138
                                }
139
                        }
140
                        if(m->flags & DATA) {
141
                                if(m->flags & REMOTE) {
142
                                        it = dispatcherList[h_dst]->el_rx_data_remote.find(mp->getId());
143
                                        if(it != dispatcherList[h_dst]->el_rx_data_remote.end())
144
                                                dispatcherList[h_dst]->el_rx_data_remote.erase(it);
145
                                } else {
146
                                        it = dispatcherList[h_dst]->el_rx_data_local.find(mp->getId());
147
                                        if(it != dispatcherList[h_dst]->el_rx_data_local.end())
148
                                                dispatcherList[h_dst]->el_rx_data_local.erase(it);
149
                                }
150
                        }
151
                }
152
        }
153
}
154

    
155
int MeasureDispatcher::sendCtrlMsg(SocketId dst, Buffer &buffer)
156
{
157
        int con_id,res;
158
        Message *msg;
159
        send_params sparam = {0,0,0,0,0};
160
        struct MonHeader *mheader = (MonHeader*)&buffer[0];
161
        mheader->length = buffer.size();
162

    
163
        if(dst == NULL)
164
                return EOK;
165

    
166
        // if connection exists, send data
167
        con_id = mlConnectionExist(dst, false);
168

    
169
        if(con_id >= 0) {
170
                if(mlGetConnectionStatus(con_id) == 1) {
171
                        mlSendData(con_id,&buffer[0],buffer.size(),MSG_TYPE_MONL,NULL);
172
                        return EOK;
173
                }
174
        }
175

    
176
        //otherwise open connection and delay sending data
177
        msg = new Message;
178
        msg->length = buffer.size();
179
        msg->msg = new char[msg->length];
180
        memcpy(msg->msg, &buffer[0], buffer.size());
181
        memcpy(msg->sid, dst, SOCKETID_SIZE);
182
        msg->mt = MSG_TYPE_MONL;
183
        msg->sparam = sparam;
184
        con_id = mlOpenConnection(dst,&open_con_cb,msg, sparam);
185

    
186
        if(con_id < 0) {
187
                delete msg->msg;
188
                delete msg;
189
                return -ENOMEM;
190
        }
191

    
192
        return EOK;
193
}
194

    
195

    
196
int MeasureDispatcher::receiveCtrlMsg(SocketId sid, MsgType mt, char *cbuf, int length)
197
{
198
        struct MonHeader *mheader = (MonHeader*) cbuf;
199

    
200
        if(length < mheader->length)
201
                return EINVAL;
202

    
203
        cbuf += MONHDR_SIZE;
204
        length -= MONHDR_SIZE;
205

    
206
        switch (mheader->type) {
207
        case INITREMOTEMEASURE:
208
                        return initRemoteMeasureRx(sid, mt, cbuf);
209
        case REMOTEMEASURERESPONSE:
210
                        return remoteMeasureResponseRx(sid, mt, cbuf);
211
        case DEINITREMOTEMEASURE:
212
                        return deinitRemoteMeasureRx(sid, mt, cbuf);
213
        case REMOTERESULTS:
214
                        return remoteResultsRx(sid, mt, cbuf);
215
        case OOBDATA:
216
                        return oobDataRx(sid, mt, cbuf, length);
217
        }
218
        return EINVAL;
219
}
220

    
221
void MeasureDispatcher::headerSetup(eControlId type, Buffer &msg) {
222
        struct MonHeader mheader;
223
        mheader.length = 0;
224
        mheader.type = type;
225
        msg.insert(msg.end(), (char*) &mheader, ((char*) &mheader) + MONHDR_SIZE );
226
}
227

    
228
int MeasureDispatcher::oobDataTx(class MonMeasure *m, char *buf, int buf_len) {
229
        Buffer buffer;
230
        struct OobData oobdata;
231

    
232
        buffer.reserve(MONHDR_SIZE + OOBDATA_SIZE + buf_len);
233

    
234
        headerSetup(OOBDATA, buffer);
235

    
236
        oobdata.mh_local = m->mh_local;
237
        oobdata.mh_remote = m->mh_remote;
238

    
239
        buffer.insert(buffer.end(), (char*)&oobdata, ((char*)&oobdata) + OOBDATA_SIZE);
240

    
241
        buffer.insert(buffer.end(), buf, buf + buf_len);
242

    
243
        return sendCtrlMsg((SocketId) m->dst_socketid, buffer);
244
}
245

    
246
int MeasureDispatcher::oobDataRx(SocketId sid, MsgType mt, char *cbuf, int length) {
247
        struct SocketIdMt h_dst;
248
        h_dst.sid = sid;
249
        h_dst.mt = MSG_TYPE_MONL;
250

    
251
        struct OobData *oobdata = (struct OobData*) cbuf;
252
        result *r = NULL;
253

    
254
        if(mm->isValidMonHandler(oobdata->mh_remote))
255
                if(mm->mMeasureInstances[oobdata->mh_remote]->status == RUNNING &&
256
                                 mlCompareSocketIDs((SocketId) mm->mMeasureInstances[oobdata->mh_remote]->dst_socketid, sid) == 0) {
257
                        if(mm->mMeasureInstances[oobdata->mh_remote]->flags & DATA) {
258
                                if(mm->mMeasureInstances[oobdata->mh_remote]->flags & REMOTE)
259
                                        r = dispatcherList[h_dst]->data_r_rx_remote;
260
                                else
261
                                        r = dispatcherList[h_dst]->data_r_rx_local;
262
                        } else if (mm->mMeasureInstances[oobdata->mh_remote]->flags & PACKET) {
263
                                if(mm->mMeasureInstances[oobdata->mh_remote]->flags & REMOTE)
264
                                        r = dispatcherList[h_dst]->pkt_r_rx_remote;
265
                                else
266
                                        r = dispatcherList[h_dst]->pkt_r_rx_local;
267
                        }
268
                        if(!r)
269
                                fatal("MONL: r is not set");
270
                        mm->mMeasureInstances[oobdata->mh_remote]->receiveOobData(cbuf + OOBDATA_SIZE,
271
                                        length - OOBDATA_SIZE, r);
272
                }
273
        return EOK;
274
}
275

    
276
int MeasureDispatcher::remoteResultsTx(SocketId dst, struct res_mh_pair *rmp, int length, MsgType mt) {
277
        Buffer buffer;
278
        struct RemoteResults rresults;
279

    
280
        buffer.reserve(MONHDR_SIZE + REMOTERESULTS_SIZE + length * sizeof(struct res_mh_pair));
281
        
282
        headerSetup(REMOTERESULTS, buffer);
283

    
284
        rresults.length = length;
285
        rresults.msg_type = mt;
286
        
287
        buffer.insert(buffer.end(), (char*)&rresults, ((char*)&rresults) + REMOTERESULTS_SIZE);
288

    
289
        buffer.insert(buffer.end(), (char*) rmp, ((char*) rmp) + length * sizeof(struct res_mh_pair));
290

    
291
        return sendCtrlMsg(dst, buffer);
292
}
293

    
294
int MeasureDispatcher::remoteResultsRx(SocketId sid, MsgType mt, char *cbuf) {
295
        struct RemoteResults *rresults = (struct RemoteResults *) cbuf;
296
        struct res_mh_pair *rmp;
297

    
298
        rmp = (struct res_mh_pair *) (cbuf + REMOTERESULTS_SIZE);
299

    
300
        for(int i = 0; i < rresults->length; i++) {
301
                if(mm->isValidMonHandler(rmp[i].mh)) {
302
                        if(mm->mMeasureInstances[rmp[i].mh]->status == RUNNING &&
303
                         mm->mMeasureInstances[rmp[i].mh]->msg_type == rresults->msg_type &&
304
                          mlCompareSocketIDs((SocketId) mm->mMeasureInstances[rmp[i].mh]->dst_socketid, sid) == 0) {
305
                                if(mm->mMeasureInstances[rmp[i].mh]->rb != NULL) {
306
                                                mm->mMeasureInstances[rmp[i].mh]->rb->newSample(rmp[i].res);
307
                                }
308
                        }
309
                }
310
        }
311
        return EOK;
312
};
313

    
314
int MeasureDispatcher::remoteMeasureResponseTx(SocketId dst, MonHandler mhr, MonHandler mh, int32_t cid, int32_t status) {
315
        Buffer buffer;
316
        struct MeasureResponse mresponse;
317

    
318
        buffer.reserve(MONHDR_SIZE + MRESPONSE_SIZE);
319

    
320
        headerSetup(REMOTEMEASURERESPONSE, buffer);
321

    
322
        mresponse.mh_remote        = mhr;
323
        mresponse.mh_local        = mh;
324
        mresponse.command                = cid;
325
        mresponse.status                = status;
326

    
327
        buffer.insert(buffer.end(), (char*)&mresponse, ((char*)&mresponse) + MRESPONSE_SIZE);
328

    
329
        return sendCtrlMsg(dst, buffer);
330
}
331

    
332
int MeasureDispatcher::remoteMeasureResponseRx(SocketId src, MsgType mt, char* cbuf) {
333
        struct MeasureResponse *mresponse = (MeasureResponse*) cbuf;
334

    
335
        if(! mm->isValidMonHandler(mresponse->mh_remote)) {        //might not exist if the measure had been deleted in the meantime
336
                return -EINVAL;
337
        }
338

    
339
        MonMeasure *m = mm->mMeasureInstances[mresponse->mh_remote];
340
        switch(mresponse->command) {
341
                case INITREMOTEMEASURE:
342
                        m->mh_remote = mresponse->mh_local;
343
                        if(mresponse->status == EOK) {
344
                                if(m->flags & OUT_OF_BAND) {
345
                                        struct timeval tv = {0,0};
346
                                        if(m->scheduleNextIn(&tv) != EOK)
347
                                                m->status = FAILED;
348
                                        else
349
                                                m->status = RUNNING;
350
                                } else
351
                                        m->status = RUNNING;
352
                        } else
353
                                m->status = RFAILED;
354
                        break;
355
                case DEINITREMOTEMEASURE:
356
                        if(mresponse->status == EOK) {
357
                                if(m->status == STOPPING) {
358
                                        stopMeasure(m);
359
                                        if(mm->monDestroyMeasure(m->mh_local) != EOK)
360
                                                m->status = FAILED;
361
                                } else
362
                                        m->status = STOPPED;
363
                        }
364
                        else
365
                                m->status = RFAILED;
366
                        break;
367
                }
368
        return EOK;
369
}
370

    
371
int MeasureDispatcher::initRemoteMeasureRx(SocketId src, MsgType mt, char *cbuf) {
372
        MonHandler mh;
373
        MonParameterValue *param_vector;
374
        MonMeasure *m = NULL;
375
        struct SocketIdMt h_dst;
376
        struct InitRemoteMeasure *initmeasure = (InitRemoteMeasure*) cbuf;
377

    
378
        h_dst.sid = src;
379
        h_dst.mt = initmeasure->msg_type;
380

    
381
        /* Check if a previous instance is running */
382
        if(dispatcherList.find(h_dst) != dispatcherList.end())
383
                m = findMeasureFromId(dispatcherList[h_dst], initmeasure->mc, initmeasure->mid);
384

    
385
        //if so remove it, so that we can create a new one (needed if mc changed)
386
        if(m != NULL)
387
                mm->monDestroyMeasure(m->mh_local);
388

    
389
        mh = mm->monCreateMeasureId(initmeasure->mid, initmeasure->mc);
390

    
391
        if(mh < 0)
392
                return remoteMeasureResponseTx(src, initmeasure->mh_local, -1, INITREMOTEMEASURE, EINVAL);
393

    
394
        mm->mMeasureInstances[mh]->mh_remote = initmeasure->mh_local;
395

    
396
        param_vector = (MonParameterValue*)(cbuf + INITREMOTEMEASURE_SIZE);
397

    
398
        for(int i=0; i < initmeasure->n_params; i++) {
399
                if(mm->monSetParameter(mh,i,param_vector[i]) != EOK)        //TODO: init might be called inside this, which might use paramerets (e.g. src) that are set only below, in activateMeasure
400
                        goto error;
401
        }
402

    
403
        if(activateMeasure(mm->mMeasureInstances[mh], src, initmeasure->msg_type) != EOK)
404
                goto error;
405

    
406
        return remoteMeasureResponseTx(src, initmeasure->mh_local, mh, INITREMOTEMEASURE, EOK);
407

    
408
error:
409
        return remoteMeasureResponseTx(src, initmeasure->mh_local, mh, INITREMOTEMEASURE, EINVAL);
410
}
411

    
412
int MeasureDispatcher::initRemoteMeasureTx(class MonMeasure *m, SocketId dst, MsgType mt) {
413
        Buffer buffer;
414

    
415
        struct InitRemoteMeasure initmeasure;
416

    
417
        buffer.reserve(MONHDR_SIZE + INITREMOTEMEASURE_SIZE + sizeof(MonParameterValue) * m->measure_plugin->params.size());
418

    
419
        headerSetup(INITREMOTEMEASURE, buffer);
420

    
421
        initmeasure.mid = m->measure_plugin->getId();
422
        /* setup flags for REMOTE party */
423
        initmeasure.mc = (m->getFlags() | REMOTE) & ~(TXLOC | RXLOC | TXREM | RXREM);
424
        if(m->getFlags() & TXREM)
425
                initmeasure.mc |= TXLOC;
426
        if(m->getFlags() & RXREM)
427
                initmeasure.mc |= RXLOC;
428

    
429
        initmeasure.mh_local = m->mh_local;
430
        initmeasure.msg_type = mt;
431
        initmeasure.n_params = m->measure_plugin->params.size();
432

    
433
        buffer.insert(buffer.end(),(char*)&initmeasure,((char*)&initmeasure) + INITREMOTEMEASURE_SIZE);
434

    
435
        for(int i=0; i < initmeasure.n_params; i++)
436
                buffer.insert(buffer.end(), (char*)&(m->param_values[i]), ((char*)&(m->param_values[i])) + sizeof(MonParameterValue));
437

    
438
        return sendCtrlMsg(dst, buffer);
439
}
440

    
441
int MeasureDispatcher::deinitRemoteMeasureTx(class MonMeasure *m) {
442
        Buffer buffer;
443
        struct DeInitRemoteMeasure deinitmeasure;
444

    
445
        buffer.reserve(MONHDR_SIZE + DEINITREMOTEMEASURE_SIZE);
446

    
447
        headerSetup(DEINITREMOTEMEASURE, buffer);
448

    
449
        deinitmeasure.mh_local = m->mh_local;
450
        deinitmeasure.mh_remote = m->mh_remote;
451

    
452
        buffer.insert(buffer.end(),(char*)&deinitmeasure, ((char*)&deinitmeasure) + DEINITREMOTEMEASURE_SIZE);
453

    
454
        return sendCtrlMsg((SocketId) m->dst_socketid, buffer);
455
};
456

    
457
int MeasureDispatcher::deinitRemoteMeasureRx(SocketId src, MsgType mt, char* cbuf) {
458
        int ret;
459

    
460
        struct DeInitRemoteMeasure *deinitmeasure = (DeInitRemoteMeasure*) cbuf;
461

    
462
        if(! mm->isValidMonHandler(deinitmeasure->mh_remote)) {        //might not exist if the measure had been deleted in the meantime
463
                return -EINVAL;
464
        }
465

    
466
        ret = deactivateMeasure(mm->mMeasureInstances[deinitmeasure->mh_remote]);
467
        if (ret != EOK )
468
                return remoteMeasureResponseTx(src, deinitmeasure->mh_local, deinitmeasure->mh_remote, DEINITREMOTEMEASURE, ret);
469

    
470
        ret = mm->monDestroyMeasure(deinitmeasure->mh_remote);        //TODO: this mh should not be reused! Packets might still arrive referencig this
471
        if (ret != EOK )
472
                return remoteMeasureResponseTx(src, deinitmeasure->mh_local, deinitmeasure->mh_remote, DEINITREMOTEMEASURE, ret);
473

    
474
        return remoteMeasureResponseTx(src,  deinitmeasure->mh_local, deinitmeasure->mh_remote, DEINITREMOTEMEASURE, EOK);
475
}
476

    
477
class MonMeasure* MeasureDispatcher::findMeasureFromId(DestinationSocketIdMtData *dd,  MeasurementCapabilities flags, MeasurementId mid) {
478
        ExecutionList::iterator it;
479

    
480
        //check if loaded
481
        if(flags & REMOTE) {
482
                it = dd->mids_remote.find(mid);
483
                if(it == dd->mids_remote.end())
484
                        return NULL;
485
        } else {
486
                it = dd->mids_local.find(mid);
487
                if(it == dd->mids_local.end())
488
                        return NULL;
489
        }
490

    
491
        return it->second;
492
}
493

    
494
int MeasureDispatcher::activateMeasure(class MonMeasure *m, SocketId dst, MsgType mt, int auto_load) {
495
        MeasurementId mid;
496
        struct SocketIdMt h_dst;
497
        int ret;
498

    
499
        h_dst.sid = dst;
500
        h_dst.mt = mt;
501

    
502
        mid = m->measure_plugin->getId();
503
        m->msg_type = mt;
504

    
505
        if(dst != NULL) {
506
                memcpy(m->dst_socketid, (uint8_t *) dst, SOCKETID_SIZE);
507
                m->dst_socketid_publish = true;
508
        } else {
509
                if(m->flags != 0)
510
                        return -EINVAL;
511
                m->dst_socketid_publish = false;
512
        }
513

    
514
        m->defaultInit();
515

    
516
        if(m->flags == 0) {
517
                m->status = RUNNING;
518
                return EOK;
519
        }
520

    
521
        //check if already present
522
        if(dispatcherList.find(h_dst) != dispatcherList.end()) {
523
                if(m->flags & REMOTE) {
524
                        if(dispatcherList[h_dst]->mids_remote.find(mid) != dispatcherList[h_dst]->mids_remote.end())
525
                                return -EEXISTS;
526
                } else {
527
                        if(dispatcherList[h_dst]->mids_local.find(mid) != dispatcherList[h_dst]->mids_local.end())
528
                                return -EEXISTS;
529
                }
530
        }
531

    
532

    
533
        if(dispatcherList.find(h_dst) == dispatcherList.end()) {
534
                createDestinationSocketIdMtData(h_dst);
535
        }
536

    
537
        if(m->flags & OUT_OF_BAND) {
538
                struct SocketIdMt h_dst2 = h_dst;
539
                h_dst2.mt = MSG_TYPE_MONL;
540
                if(dispatcherList.find(h_dst2) == dispatcherList.end())
541
                        createDestinationSocketIdMtData(h_dst2);
542
        }
543

    
544
        if(m->flags & PACKET) {
545
                if(m->flags & REMOTE) {
546
                        m->r_rx_list = dispatcherList[h_dst]->pkt_r_rx_remote;
547
                        m->r_tx_list = dispatcherList[h_dst]->pkt_r_tx_remote;
548
                } else {
549
                        m->r_rx_list = dispatcherList[h_dst]->pkt_r_rx_local;
550
                        m->r_tx_list = dispatcherList[h_dst]->pkt_r_tx_local;
551
                }
552
        } else {
553
                if(m->flags & REMOTE) {
554
                        m->r_rx_list = dispatcherList[h_dst]->data_r_rx_remote;
555
                        m->r_tx_list = dispatcherList[h_dst]->data_r_tx_remote;
556
                } else {
557
                        m->r_rx_list = dispatcherList[h_dst]->data_r_rx_local;
558
                        m->r_tx_list = dispatcherList[h_dst]->data_r_tx_local;
559
                }
560
        }
561
        //TODO: check deps
562

    
563
        // add it to loaded measure list
564
        if(m->flags & REMOTE)
565
                dispatcherList[h_dst]->mids_remote[mid] = m;
566
        else
567
                dispatcherList[h_dst]->mids_local[mid] = m;
568

    
569
        //Handle IN_BAND measures
570
        addMeasureToExecLists(h_dst, m);
571

    
572
        //Handle OUT_OF_BAND measures
573
        if(m->flags & OUT_OF_BAND) {
574
                struct timeval tv = {0,0};
575
                //if only local
576
                if(!(m->flags & TXREM) && !(m->flags & RXREM))
577
                        m->scheduleNextIn(&tv); //start it!
578
        }
579

    
580
        /* Remote counterparts ? */
581
        if(m->flags & TXREM || m->flags & RXREM) {
582
                /* Yes, initialise them */
583
                m->status = INITIALISING;
584
                ret = initRemoteMeasureTx(m,dst,mt);
585
                if(ret != EOK)
586
                        goto error;
587
        } else {
588
                m->status = RUNNING;
589
        }
590

    
591
        if(m->dst_socketid == NULL)
592
                info("bla1");
593
        return EOK;
594

    
595
error:
596
        if(m->dst_socketid == NULL)
597
                info("bla2");
598

    
599
        stopMeasure(m);
600
        if(m->dst_socketid == NULL)
601
                info("bla3");
602

    
603
        return ret;
604
}
605

    
606
void MeasureDispatcher::createDestinationSocketIdMtData(struct SocketIdMt h_dst) {
607
        DestinationSocketIdMtData *dd;
608
        
609
        dd = new DestinationSocketIdMtData;
610

    
611
        memcpy(dd->sid, (uint8_t*) h_dst.sid, SOCKETID_SIZE);
612
        dd->h_dst.sid = (SocketId) &(dd->sid);
613
        dd->h_dst.mt = h_dst.mt;
614

    
615
        dispatcherList[dd->h_dst] = dd;
616

    
617
        //initialize
618
        for(int i = 0; i < R_LAST_PKT; i++) {
619
                dispatcherList[dd->h_dst]->pkt_r_rx_local[i] = NAN;
620
                dispatcherList[dd->h_dst]->pkt_r_rx_remote[i] = NAN;
621
                dispatcherList[dd->h_dst]->pkt_r_tx_local[i] = NAN;
622
                dispatcherList[dd->h_dst]->pkt_r_tx_remote[i] = NAN;
623
        }
624
        dispatcherList[dd->h_dst]->pkt_tx_seq_num = 0;
625

    
626
        for(int i = 0; i < R_LAST_DATA; i++) {
627
                dispatcherList[dd->h_dst]->data_r_rx_local[i] = NAN;
628
                dispatcherList[dd->h_dst]->data_r_rx_remote[i] = NAN;
629
                dispatcherList[dd->h_dst]->data_r_tx_local[i] = NAN;
630
                dispatcherList[dd->h_dst]->data_r_tx_remote[i] = NAN;
631
        }
632
        dispatcherList[dd->h_dst]->data_tx_seq_num = 0;
633
}
634

    
635
void MeasureDispatcher::destroyDestinationSocketIdMtData(SocketId dst, MsgType mt) {
636
        DestinationSocketIdMtData *dd;
637
        DispatcherListSocketIdMt::iterator it;
638
        struct SocketIdMt h_dst;
639

    
640
        h_dst.sid = dst;
641
        h_dst.mt = mt;
642

    
643
        it = dispatcherList.find(h_dst);
644
        if(it != dispatcherList.end()) {
645
                dd = it->second;
646
                dispatcherList.erase(it);
647
                delete dd;
648
        }
649
        else
650
                fatal("Trying to erase non existent DestinationSocketIdMtData");
651
}
652

    
653
int MeasureDispatcher::deactivateMeasure(class MonMeasure *m) {
654
        SocketIdMt h_dst;
655

    
656
        h_dst.sid = (SocketId) m->dst_socketid;
657
        h_dst.mt = m->msg_type;
658

    
659
        if(m->status == STOPPED)
660
                return EOK;
661

    
662
        if(m->used_counter > 0)
663
                return -EINUSE;
664

    
665
        if(m->flags == 0) {
666
                m->defaultStop();
667
                m->status = STOPPED;
668
                return EOK;
669
        }
670

    
671
        if(dispatcherList.find(h_dst) == dispatcherList.end())
672
                return EINVAL;
673

    
674
        //remove it from the loaded list
675
        if(m->flags & REMOTE) {
676
                if(dispatcherList[h_dst]->mids_remote.find(m->measure_plugin->getId()) == dispatcherList[h_dst]->mids_remote.end())
677
                        return -EINVAL;
678
        } else {
679
                if(dispatcherList[h_dst]->mids_local.find(m->measure_plugin->getId()) == dispatcherList[h_dst]->mids_local.end())
680
                        return -EINVAL;
681
        }
682

    
683
        return stopMeasure(m);
684
}
685

    
686
int MeasureDispatcher::stopMeasure(class MonMeasure *m) {
687
        SocketIdMt h_dst;
688

    
689
        h_dst.sid = (SocketId) m->dst_socketid;
690
        h_dst.mt = m->msg_type;
691

    
692
        m->defaultStop();
693

    
694
        //Handle IN_BAND measures
695
        delMeasureFromExecLists(m);
696
        //Handle OUT_OF:BAND measures
697
        //TODO: Anything to do at all?
698

    
699
        //Handle remote counterpart
700
        if(m->flags & TXREM || m->flags & RXREM) {
701
                        m->status = STOPPING;
702
                        deinitRemoteMeasureTx(m);
703
        } else
704
                m->status = STOPPED;
705
        m->r_tx_list = m->r_rx_list = NULL;
706

    
707
        //remove it from the loaded list
708
        if(m->flags & REMOTE)
709
                dispatcherList[h_dst]->mids_remote.erase(dispatcherList[h_dst]->mids_remote.find(m->measure_plugin->getId()));
710
        else
711
                dispatcherList[h_dst]->mids_local.erase(dispatcherList[h_dst]->mids_local.find(m->measure_plugin->getId()));
712

    
713
        //check if we can remove also the DistinationSocektIdMtData
714
        if(dispatcherList[h_dst]->mids_remote.empty() && dispatcherList[h_dst]->mids_local.empty())
715
                destroyDestinationSocketIdMtData((SocketId) m->dst_socketid, m->msg_type);
716
}
717

    
718

    
719
void MeasureDispatcher::cbRxPkt(void *arg) {
720
        ExecutionList::iterator it;
721
        result *r_loc, *r_rem;
722
        struct res_mh_pair rmp[R_LAST_PKT];
723
        int i,j;
724
        mon_pkt_inf *pkt_info = (mon_pkt_inf *)arg;
725
        struct MonPacketHeader *mph = (MonPacketHeader*) pkt_info->monitoringHeader;
726
        struct SocketIdMt h_dst;
727
        h_dst.sid = pkt_info->remote_socketID;
728
        h_dst.mt = pkt_info->msgtype;
729

    
730
        if(pkt_info->remote_socketID == NULL)
731
                return;
732

    
733
        /* something to do? */
734
        if(dispatcherList.size() == 0)
735
                return;
736

    
737
        if(dispatcherList.find(h_dst) == dispatcherList.end())
738
                return;
739

    
740
        //we have a result vector to fill
741
        r_loc = dispatcherList[h_dst]->pkt_r_rx_local;
742
        r_rem = dispatcherList[h_dst]->pkt_r_rx_remote;
743

    
744
        //TODO: add standard fields
745
        if(mph != NULL) {
746
                r_rem[R_SEQNUM] = r_loc[R_SEQNUM] = ntohl(mph->seq_num);
747
                r_rem[R_INITIAL_TTL] = r_loc[R_INITIAL_TTL] = mph->initial_ttl;
748
                r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] = ntohl(mph->ts_usec) / 1000000.0;
749
                r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] = r_loc[R_SEND_TIME] + ntohl(mph->ts_sec);
750
                r_rem[R_REPLY_TIME] = r_loc[R_REPLY_TIME] = ntohl(mph->ans_ts_usec) / 1000000.0;
751
                r_rem[R_REPLY_TIME] = r_loc[R_REPLY_TIME] = r_loc[R_REPLY_TIME] + ntohl(mph->ans_ts_sec);
752
        }
753

    
754
        r_rem[R_SIZE] = r_loc[R_SIZE] = pkt_info->bufSize;
755
        r_rem[R_TTL] = r_loc[R_TTL] = pkt_info->ttl;
756
        r_rem[R_DATA_ID] = r_loc[R_DATA_ID] = pkt_info->dataID;
757
        r_rem[R_DATA_OFFSET] = r_loc[R_DATA_OFFSET] = pkt_info->offset;
758
        r_rem[R_RECEIVE_TIME] = r_loc[R_RECEIVE_TIME] = pkt_info->arrival_time.tv_usec / 1000000.0;
759
        r_rem[R_RECEIVE_TIME] = r_loc[R_RECEIVE_TIME] += pkt_info->arrival_time.tv_sec;
760

    
761

    
762
        // are there local in band measures?
763
        if (dispatcherList[h_dst]->el_rx_pkt_local.size() > 0) {
764
                /* yes! */
765
                ExecutionList *el_ptr_loc = &(dispatcherList[h_dst]->el_rx_pkt_local);
766
        
767
                /* Call measures in order */
768
                for( it = el_ptr_loc->begin(); it != el_ptr_loc->end(); it++)
769
                        if(it->second->status == RUNNING)
770
                                it->second->RxPktLocal(el_ptr_loc);
771
        }
772

    
773
        if (dispatcherList[h_dst]->el_rx_pkt_remote.size() > 0) {
774
                /* And remotes */
775
                ExecutionList *el_ptr_rem = &(dispatcherList[h_dst]->el_rx_pkt_remote);
776
        
777
                /* Call measurees in order */
778
                j = 0;
779
                for( it = el_ptr_rem->begin(); it != el_ptr_rem->end(); it++)
780
                        if(it->second->status == RUNNING)
781
                                it->second->RxPktRemote(rmp,j,el_ptr_rem);
782
        
783
                /* send back results */
784
                if(j > 0)
785
                remoteResultsTx(h_dst.sid, rmp, j, h_dst.mt);
786
        }
787
}
788

    
789
void MeasureDispatcher::cbRxData(void *arg) {
790
        ExecutionList::iterator it;
791
        result *r_loc, *r_rem;
792
        struct res_mh_pair rmp[R_LAST_DATA];
793
        int i,j;
794
        mon_data_inf *data_info = (mon_data_inf *)arg;
795
        struct MonDataHeader *mdh = (MonDataHeader*) data_info->monitoringDataHeader;
796
        struct SocketIdMt h_dst;
797
        h_dst.sid = data_info->remote_socketID;
798
        h_dst.mt = data_info->msgtype;
799

    
800
        if(data_info->remote_socketID == NULL)
801
                return;
802

    
803
        /* something to do? */
804
        if(dispatcherList.size() == 0)
805
                return;
806

    
807
        if(dispatcherList.find(h_dst) == dispatcherList.end())
808
                return;
809

    
810
        //we have a result vector to fill
811
        r_loc = dispatcherList[h_dst]->data_r_rx_local;
812
        r_rem = dispatcherList[h_dst]->data_r_rx_remote;
813

    
814
        //TODO: add standard fields
815
        if(mdh != NULL) {
816
                r_rem[R_SEQNUM] = r_loc[R_SEQNUM] = ntohl(mdh->seq_num);
817
                r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] = ntohl(mdh->ts_usec) / 1000000.0;
818
                r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] = r_loc[R_SEND_TIME] + ntohl(mdh->ts_sec);
819
                r_rem[R_REPLY_TIME] = r_loc[R_REPLY_TIME] = ntohl(mdh->ans_ts_usec) / 1000000.0;
820
                r_rem[R_REPLY_TIME] = r_loc[R_REPLY_TIME] = r_loc[R_REPLY_TIME] + ntohl(mdh->ans_ts_sec);
821
        }
822
        r_rem[R_SIZE] = r_loc[R_SIZE] = data_info->bufSize;
823
        r_rem[R_RECEIVE_TIME] = r_loc[R_RECEIVE_TIME] = data_info->arrival_time.tv_usec / 1000000.0;
824
        r_rem[R_RECEIVE_TIME] = r_loc[R_RECEIVE_TIME] += data_info->arrival_time.tv_sec;
825

    
826
        if (dispatcherList[h_dst]->el_rx_data_local.size() > 0) {
827
                /* yes! */
828
                /* Locals first */
829
                ExecutionList *el_ptr_loc = &(dispatcherList[h_dst]->el_rx_data_local);
830
        
831
        
832
                /* Call measurees in order */
833
                for( it = el_ptr_loc->begin(); it != el_ptr_loc->end(); it++){
834
                        MonMeasure *m = it->second;
835
                        if(it->second->status == RUNNING)
836
                                it->second->RxDataLocal(el_ptr_loc);
837
                }
838
        }
839

    
840
        if (dispatcherList[h_dst]->el_rx_data_remote.size() > 0) {
841
                /* And remotes */
842
        
843
                ExecutionList *el_ptr_rem = &(dispatcherList[h_dst]->el_rx_data_remote);
844
        
845
                /* Call measurees in order */
846
                j = 0;
847
                for( it = el_ptr_rem->begin(); it != el_ptr_rem->end(); it++)
848
                        if(it->second->status == RUNNING)
849
                                it->second->RxDataRemote(rmp,j,el_ptr_rem);
850
        
851
                /* send back results */
852
                if(j > 0)
853
                        remoteResultsTx(h_dst.sid, rmp, j, h_dst.mt);
854
        }
855
}
856

    
857
void MeasureDispatcher::cbTxPkt(void *arg) {
858
        ExecutionList::iterator it;
859
        result *r_loc, *r_rem;
860
        struct res_mh_pair rmp[R_LAST_PKT];
861
        int i,j;
862
        mon_pkt_inf *pkt_info = (mon_pkt_inf *)arg;
863
        struct MonPacketHeader *mph = (MonPacketHeader*) pkt_info->monitoringHeader;
864
        struct timeval ts;
865
        struct SocketIdMt h_dst;
866
        h_dst.sid = pkt_info->remote_socketID;
867
        h_dst.mt = pkt_info->msgtype;
868

    
869
        if(pkt_info->remote_socketID == NULL)
870
                return;
871

    
872
        /* something to do? */
873
        if(dispatcherList.size() == 0)
874
                return;
875

    
876
        if(dispatcherList.find(h_dst) == dispatcherList.end())
877
                return;
878

    
879
        /* yes! */
880

    
881
        r_loc = dispatcherList[h_dst]->pkt_r_tx_local;
882
        r_rem = dispatcherList[h_dst]->pkt_r_tx_remote;
883

    
884
        /* prepare initial result vector (based on header information) */
885
                
886
        r_rem[R_SEQNUM] = r_loc[R_SEQNUM] = ++(dispatcherList[h_dst]->pkt_tx_seq_num);
887
        r_rem[R_SIZE] = r_loc[R_SIZE] = pkt_info->bufSize;
888
        r_rem[R_INITIAL_TTL] = r_loc[R_INITIAL_TTL] = initial_ttl;
889
        gettimeofday(&ts,NULL);        
890
        r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] = ts.tv_usec / 1000000.0;
891
        r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] =  r_loc[R_SEND_TIME] + ts.tv_sec;
892

    
893
        if(dispatcherList[h_dst]->el_tx_pkt_local.size() > 0) {
894

    
895
                ExecutionList *el_ptr_loc = &(dispatcherList[h_dst]->el_tx_pkt_local);
896

    
897
                /* Call measurees in order */
898
                for( it = el_ptr_loc->begin(); it != el_ptr_loc->end(); it++)
899
                        if(it->second->status == RUNNING)
900
                                it->second->TxPktLocal(el_ptr_loc);
901
        }
902

    
903
        if(dispatcherList[h_dst]->el_tx_pkt_remote.size() > 0) {
904
                /* And remotes */
905
                ExecutionList *el_ptr_rem = &(dispatcherList[h_dst]->el_tx_pkt_remote);
906
        
907
                /* Call measurees in order */
908
                j = 0;
909
                for( it = el_ptr_rem->begin(); it != el_ptr_rem->end(); it++)
910
                        if(it->second->status == RUNNING)
911
                                it->second->TxPktRemote(rmp,j,el_ptr_rem);
912
        
913
                /* send back results */
914
                if(j > 0)
915
                        remoteResultsTx(h_dst.sid, rmp, j, h_dst.mt);
916
        }
917

    
918
        if(mph != NULL) {
919
                mph->seq_num = htonl((uint32_t)r_rem[R_SEQNUM]);
920
                mph->initial_ttl = (uint32_t)r_rem[R_INITIAL_TTL];
921
                mph->ts_sec = htonl((uint32_t)floor(r_rem[R_SEND_TIME]));
922
                mph->ts_usec = htonl((uint32_t)((r_rem[R_SEND_TIME] - floor(r_rem[R_SEND_TIME])) * 1000000.0));
923
                if(!std::isnan(r_rem[R_REPLY_TIME])) {
924
                        mph->ans_ts_sec = htonl((uint32_t)floor(r_rem[R_REPLY_TIME]));
925
                        mph->ans_ts_usec = htonl((uint32_t)((r_rem[R_REPLY_TIME] - floor(r_rem[R_REPLY_TIME])) * 1000000.0));
926
                } else {
927
                        mph->ans_ts_sec = 0;
928
                        mph->ans_ts_usec = 0;
929
                }
930
        }
931
}
932

    
933
void MeasureDispatcher::cbTxData(void *arg) {
934
        ExecutionList::iterator it;
935
        result *r_loc,*r_rem;
936
        struct res_mh_pair rmp[R_LAST_DATA];
937
        int i,j;
938
        mon_data_inf *data_info = (mon_data_inf *)arg;
939
        struct MonDataHeader *mdh = (MonDataHeader*) data_info->monitoringDataHeader;
940
        struct timeval ts;
941
        struct SocketIdMt h_dst;
942
        h_dst.sid = data_info->remote_socketID;
943
        h_dst.mt = data_info->msgtype;
944

    
945
        if(data_info->remote_socketID == NULL)
946
                return;
947

    
948
        /* something to do? */
949
        if(dispatcherList.size() == 0)
950
                return;
951

    
952
        if(dispatcherList.find(h_dst) == dispatcherList.end())
953
                return;
954
        
955
        /* yes! */
956
        r_loc = dispatcherList[h_dst]->data_r_tx_local;
957
        r_rem = dispatcherList[h_dst]->data_r_tx_remote;
958

    
959
        
960
        //TODO add fields
961
        r_rem[R_SIZE] = r_loc[R_SIZE] = data_info->bufSize;
962
        r_rem[R_SEQNUM] = r_loc[R_SEQNUM] = ++(dispatcherList[h_dst]->data_tx_seq_num);
963
        gettimeofday(&ts,NULL);
964
        r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] = ts.tv_usec / 1000000.0;
965
        r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] = r_loc[R_SEND_TIME] + ts.tv_sec;
966

    
967
        if(dispatcherList[h_dst]->el_tx_data_local.size() > 0) {
968
                ExecutionList *el_ptr_loc = &(dispatcherList[h_dst]->el_tx_data_local);
969

    
970
                /* Call measures in order */
971
                for( it = el_ptr_loc->begin(); it != el_ptr_loc->end(); it++)
972
                        if(it->second->status == RUNNING)
973
                                it->second->TxDataLocal(el_ptr_loc);
974
        }
975

    
976
        if(dispatcherList[h_dst]->el_tx_data_remote.size() > 0) {
977
                /* And remote */
978
                ExecutionList *el_ptr_rem = &(dispatcherList[h_dst]->el_tx_data_remote);
979
        
980
                /* Call measures in order */
981
                j = 0;
982
                for( it = el_ptr_rem->begin(); it != el_ptr_rem->end(); it++)
983
                        if(it->second->status == RUNNING)
984
                                it->second->TxDataRemote(rmp, j, el_ptr_rem);
985
        
986
                /* send back results */
987
                if(j > 0)
988
                        remoteResultsTx(h_dst.sid, rmp, j, h_dst.mt);
989
        }
990

    
991
        if(mdh != NULL) {
992
                mdh->seq_num =  htonl((uint32_t)r_rem[R_SEQNUM]);
993
                mdh->ts_sec =  htonl((uint32_t)floor(r_rem[R_SEND_TIME]));
994
                mdh->ts_usec =  htonl((uint32_t)((r_rem[R_SEND_TIME] - floor(r_rem[R_SEND_TIME])) * 1000000.0));
995
                if(!std::isnan(r_rem[R_REPLY_TIME])) {
996
                        mdh->ans_ts_sec = htonl((uint32_t)floor(r_rem[R_REPLY_TIME]));
997
                        mdh->ans_ts_usec = htonl((uint32_t)((r_rem[R_REPLY_TIME] - floor(r_rem[R_REPLY_TIME])) * 1000000.0));
998
                } else {
999
                        mdh->ans_ts_sec = 0;
1000
                        mdh->ans_ts_usec = 0;
1001
                }
1002
        }
1003
}
1004

    
1005
int MeasureDispatcher::cbHdrPkt(SocketId sid, MsgType mt) {
1006
        struct SocketIdMt h_dst;
1007
        h_dst.sid = sid;
1008
        h_dst.mt = mt;
1009

    
1010
        if(sid == NULL)
1011
                return 0;
1012

    
1013
        /* something to do? */
1014
        if(dispatcherList.size() == 0)
1015
                return 0;
1016

    
1017
        if(dispatcherList.find(h_dst) == dispatcherList.end())
1018
                return 0;
1019

    
1020
        /* yes! */
1021
        return MON_PACKET_HEADER_SIZE;
1022
}
1023

    
1024
int MeasureDispatcher::cbHdrData(SocketId sid, MsgType mt) {
1025
        struct SocketIdMt h_dst;
1026
        h_dst.sid = sid;
1027
        h_dst.mt = mt;
1028

    
1029
        if(sid == NULL)
1030
                return 0;
1031

    
1032
        /* something to do? */
1033
        if(dispatcherList.size() == 0)
1034
                return 0;
1035

    
1036
        if(dispatcherList.find(h_dst) == dispatcherList.end())
1037
                return 0;
1038

    
1039
        /* yes! return the space we need */
1040
        return MON_DATA_HEADER_SIZE;
1041
}
1042

    
1043
int MeasureDispatcher::scheduleMeasure(MonHandler mh) {
1044
        if(mm->isValidMonHandler(mh))
1045
                mm->mMeasureInstances[mh]->Run();
1046
        return EOK;
1047
}
1048

    
1049
int MeasureDispatcher::schedulePublish(MonHandler mh) {
1050
        if(mm->isValidMonHandler(mh))
1051
                if(mm->mMeasureInstances[mh]->rb != NULL)
1052
                        mm->mMeasureInstances[mh]->rb->publishResults();
1053
        return EOK;
1054
}