Revision d1e0f2ca

View differences:

monl/measure_dispatcher.cpp
45 45
	delete msg;
46 46
}
47 47

  
48
int MeasureDispatcher::addMeasureToExecList(DispatcherListMsgType &dl, class MonMeasure *m, MsgType mt) {
48
void MeasureDispatcher::addMeasureToExecLists(SocketIdMt h_dst, class MonMeasure *m) {
49 49
	class MeasurePlugin *mp = m->measure_plugin;
50
	ExecutionList *el;
51
	int32_t *n_el;
52

  
53
	/* Create an execution list if necessary */
54
	if(dl.find(mt) == dl.end()) {
55
		dl[mt] = new DestinationMsgTypeData;
56
		dl[mt]->n_el_local = 0;
57
		dl[mt]->n_el_remote = 0;
58
	}
59

  
60
	if(m->flags & REMOTE) {
61
		n_el = &(dl[mt]->n_el_remote);
62
		el = &(dl[mt]->el_remote);
63
	} else {
64
		n_el = &(dl[mt]->n_el_local);
65
		el = &(dl[mt]->el_local);
66
	}
67
	/* Check if this measure is already in the list */
68
	if(el->find(mp->getId()) != el->end())
69
		return -EEXISTS;
70

  
71
	/* Check for all dependencies */
72
	std::vector<MeasurementId>::iterator it;
73
	std::vector<MeasurementId> deps = mp->getDeps();
74
	for(it = deps.begin(); it != deps.end(); it++) {
75
		if(el->find(*it) == el->end())
76
			return -EUNRESOLVEDDEP;
77
	}
78 50

  
79
	/* update counters */
80
	for(it = deps.begin(); it != deps.end(); it++) {
81
		(*el)[*it]->used_counter++;
51
	/* IN_BAND  measurments added to hook executionlists */
52
	if(m->flags & IN_BAND) {
53
		/* TXLOC */
54
		if(m->flags & TXLOC) {
55
			if(m->flags & PACKET) {
56
				if(m->flags & REMOTE)
57
					dispatcherList[h_dst]->el_tx_pkt_remote[mp->getId()] = m;
58
				else
59
					dispatcherList[h_dst]->el_tx_pkt_local[mp->getId()] = m;
60
			}
61
			if(m->flags & DATA) {
62
				if(m->flags & REMOTE)
63
					dispatcherList[h_dst]->el_tx_data_remote[mp->getId()] = m;
64
				else
65
					dispatcherList[h_dst]->el_tx_data_local[mp->getId()] = m;
66
			}
67
		}
68
		/* RXLOC */
69
		if(m->flags & RXLOC) {
70
			if(m->flags & PACKET)  {
71
				if(m->flags & REMOTE)
72
					dispatcherList[h_dst]->el_rx_pkt_remote[mp->getId()] = m;
73
				else
74
					dispatcherList[h_dst]->el_rx_pkt_local[mp->getId()] = m;
75
			}
76
			if(m->flags & DATA) {
77
				if(m->flags & REMOTE)
78
					dispatcherList[h_dst]->el_rx_data_remote[mp->getId()] = m;
79
				else
80
					dispatcherList[h_dst]->el_rx_data_local[mp->getId()] = m;
81
			}
82
		}
82 83
	}
83

  
84
	/* Add measure to execution list */
85
	(*el)[mp->getId()] = m;
86
	(*n_el)++;
87

  
88
	return EOK;
89 84
}
90 85

  
91
int MeasureDispatcher::delMeasureFromExecList(DispatcherListMsgType &dl, class MonMeasure *m, MsgType mt) {
86
void MeasureDispatcher::delMeasureFromExecLists(MonMeasure *m) {
87
	SocketIdMt h_dst;
92 88
	class MeasurePlugin *mp = m->measure_plugin;
93
	ExecutionList *el;
94
	int32_t *n_el;
95

  
96
	/* Create an execution list if necessary */
97
	if(dl.find(mt) == dl.end())
98
		return -EINVAL;
99

  
100
	if(m->flags & REMOTE) {
101
		n_el = &(dl[mt]->n_el_remote);
102
		el = &(dl[mt]->el_remote);
103
	} else {
104
		n_el = &(dl[mt]->n_el_local);
105
		el = &(dl[mt]->el_local);
106
	}
107

  
108
	/* update counters */
109
	/* Check for all dependencies */
110
	std::vector<MeasurementId>::iterator it;
111
	std::vector<MeasurementId> deps = mp->getDeps();
112
	for(it = deps.begin(); it != deps.end(); it++) {
113
		(*el)[*it]->used_counter--;
114
	}
89
	ExecutionList::iterator it;
115 90

  
116
	el->erase(el->find(mp->getId()));
117
	*(n_el)--;
91
	h_dst.sid = (SocketId) m->dst_socketid;
92
	h_dst.mt = m->msg_type;
118 93

  
119
	if(dl[mt]->n_el_local == 0 && dl[mt]->n_el_remote == 0) {
120
		delete dl[mt];
121
		dl.erase(dl.find(mt));
94
	/* IN_BAND  measurments added to hook executionlists */
95
	if(m->flags & IN_BAND) {
96
		/* TXLOC */
97
		if(m->flags & TXLOC) {
98
			if(m->flags & PACKET) {
99
				if(m->flags & REMOTE) {
100
					it = dispatcherList[h_dst]->el_tx_pkt_remote.find(mp->getId());
101
					if(it != dispatcherList[h_dst]->el_tx_pkt_remote.end())
102
						dispatcherList[h_dst]->el_tx_pkt_remote.erase(it);
103
				} else {
104
					it = dispatcherList[h_dst]->el_tx_pkt_local.find(mp->getId());
105
					if(it != dispatcherList[h_dst]->el_tx_pkt_local.end())
106
						dispatcherList[h_dst]->el_tx_pkt_local.erase(it);
107
				}
108
			}
109
			if(m->flags & DATA) {
110
				if(m->flags & REMOTE) {
111
					it = dispatcherList[h_dst]->el_tx_data_remote.find(mp->getId());
112
					if(it != dispatcherList[h_dst]->el_tx_data_remote.end())
113
						dispatcherList[h_dst]->el_tx_data_remote.erase(it);
114
				} else {
115
					it = dispatcherList[h_dst]->el_tx_data_local.find(mp->getId());
116
					if(it != dispatcherList[h_dst]->el_tx_data_local.end())
117
						dispatcherList[h_dst]->el_tx_data_local.erase(it);
118
				}
119
			}
120
		}
121
		/* RXLOC */
122
		if(m->flags & RXLOC) {
123
			if(m->flags & PACKET) {
124
				if(m->flags & REMOTE) {
125
					it = dispatcherList[h_dst]->el_rx_pkt_remote.find(mp->getId());
126
					if(it != dispatcherList[h_dst]->el_rx_pkt_remote.end())
127
						dispatcherList[h_dst]->el_rx_pkt_remote.erase(it);
128
				} else {
129
					it = dispatcherList[h_dst]->el_rx_pkt_local.find(mp->getId());
130
					if(it != dispatcherList[h_dst]->el_rx_pkt_local.end())
131
						dispatcherList[h_dst]->el_rx_pkt_local.erase(it);
132
				}
133
			}
134
			if(m->flags & DATA) {
135
				if(m->flags & REMOTE) {
136
					it = dispatcherList[h_dst]->el_rx_data_remote.find(mp->getId());
137
					if(it != dispatcherList[h_dst]->el_rx_data_remote.end())
138
						dispatcherList[h_dst]->el_rx_data_remote.erase(it);
139
				} else {
140
					it = dispatcherList[h_dst]->el_rx_data_local.find(mp->getId());
141
					if(it != dispatcherList[h_dst]->el_rx_data_local.end())
142
						dispatcherList[h_dst]->el_rx_data_local.erase(it);
143
				}
144
			}
145
		}
122 146
	}
123

  
124
	return EOK;
125 147
}
126 148

  
127 149
int MeasureDispatcher::sendCtrlMsg(SocketId dst, Buffer &buffer)
128 150
{
129 151
	int con_id,res;
130 152
	Message *msg;
131
	send_params sparam;
153
	send_params sparam = {0,0,0,0,0};
132 154
	struct MonHeader *mheader = (MonHeader*)&buffer[0];
133 155
	mheader->length = buffer.size();
134 156

  
......
145 167
		}
146 168
	}
147 169

  
148
	memset(&sparam, 0, sizeof(send_params));
149

  
150 170
	//otherwise open connection and delay sending data
151 171
	msg = new Message;
152 172
	msg->length = buffer.size();
......
214 234

  
215 235
	buffer.insert(buffer.end(), buf, buf + buf_len);
216 236

  
217
	return sendCtrlMsg(m->dst_socketid, buffer);
237
	return sendCtrlMsg((SocketId) m->dst_socketid, buffer);
218 238
}
219 239

  
220 240
int MeasureDispatcher::oobDataRx(SocketId sid, MsgType mt, char *cbuf, int length) {
241
	struct SocketIdMt h_dst;
242
	h_dst.sid = sid;
243
	h_dst.mt = MSG_TYPE_MONL;
244

  
221 245
	struct OobData *oobdata = (struct OobData*) cbuf;
222 246
	result *r = NULL;
223 247

  
224 248
	if(mm->isValidMonHandler(oobdata->mh_remote))
225 249
		if(mm->mMeasureInstances[oobdata->mh_remote]->status == RUNNING &&
226
 				mlCompareSocketIDs(mm->mMeasureInstances[oobdata->mh_remote]->dst_socketid, sid) == 0) {
250
 				mlCompareSocketIDs((SocketId) mm->mMeasureInstances[oobdata->mh_remote]->dst_socketid, sid) == 0) {
227 251
			if(mm->mMeasureInstances[oobdata->mh_remote]->flags & DATA) {
228 252
				if(mm->mMeasureInstances[oobdata->mh_remote]->flags & REMOTE)
229
					r = dispatcherList[sid]->r_data[MSG_TYPE_MONL]->r_rx_remote;
253
					r = dispatcherList[h_dst]->data_r_rx_remote;
230 254
				else
231
					r = dispatcherList[sid]->r_data[MSG_TYPE_MONL]->r_rx_local;
232
			} else if (mm->mMeasureInstances[oobdata->mh_remote]->flags & PACKET){
255
					r = dispatcherList[h_dst]->data_r_rx_local;
256
			} else if (mm->mMeasureInstances[oobdata->mh_remote]->flags & PACKET) {
233 257
				if(mm->mMeasureInstances[oobdata->mh_remote]->flags & REMOTE)
234
					r = dispatcherList[sid]->r_pkt[MSG_TYPE_MONL]->r_rx_remote;
258
					r = dispatcherList[h_dst]->pkt_r_rx_remote;
235 259
				else
236
					r = dispatcherList[sid]->r_pkt[MSG_TYPE_MONL]->r_rx_local;
260
					r = dispatcherList[h_dst]->pkt_r_rx_local;
237 261
			}
238 262
			if(!r)
239 263
				fatal("MONL: r is not set");
......
271 295
		if(mm->isValidMonHandler(rmp[i].mh)) {
272 296
			if(mm->mMeasureInstances[rmp[i].mh]->status == RUNNING &&
273 297
			 mm->mMeasureInstances[rmp[i].mh]->msg_type == rresults->msg_type &&
274
 			 mlCompareSocketIDs(mm->mMeasureInstances[rmp[i].mh]->dst_socketid, sid) == 0) {
298
 			 mlCompareSocketIDs((SocketId) mm->mMeasureInstances[rmp[i].mh]->dst_socketid, sid) == 0) {
275 299
				if(mm->mMeasureInstances[rmp[i].mh]->rb != NULL) {
276 300
						mm->mMeasureInstances[rmp[i].mh]->rb->newSample(rmp[i].res);
277 301
				}
......
324 348
			break;
325 349
		case DEINITREMOTEMEASURE:
326 350
			if(mresponse->status == EOK) {
327
				if(m->status == STOPPING && m->auto_loaded == true) {
351
				if(m->status == STOPPING) {
352
					stopMeasure(m);
328 353
					if(mm->monDestroyMeasure(m->mh_local) != EOK)
329 354
						m->status = FAILED;
330
					if(dispatcherList.find(src) != dispatcherList.end()) {
331
						if(dispatcherList[src]->rx_pkt.empty() && dispatcherList[src]->tx_pkt.empty() &&
332
								dispatcherList[src]->rx_data.empty() && dispatcherList[src]->tx_data.empty())
333
							delete dispatcherList[src];
334
					}
335 355
				} else
336 356
					m->status = STOPPED;
337 357
			}
......
346 366
	MonHandler mh;
347 367
	MonParameterValue *param_vector;
348 368
	MonMeasure *m = NULL;
349

  
369
	struct SocketIdMt h_dst;
350 370
	struct InitRemoteMeasure *initmeasure = (InitRemoteMeasure*) cbuf;
351 371

  
372
	h_dst.sid = src;
373
	h_dst.mt = initmeasure->msg_type;
374

  
352 375
	/* Check if a previous instance is running */
353
	if(dispatcherList.find(src) != dispatcherList.end())
354
		m = findMeasureFromId(dispatcherList[src], initmeasure->mc, initmeasure->mid, initmeasure->msg_type);
376
	if(dispatcherList.find(h_dst) != dispatcherList.end())
377
		m = findMeasureFromId(dispatcherList[h_dst], initmeasure->mc, initmeasure->mid);
355 378

  
356
	if(m == NULL)
357
		mh = mm->monCreateMeasureId(initmeasure->mid, initmeasure->mc);
358
	else
359
		mh = m->mh_local;
379
	//if so remove it, so that we can create a new one (needed if mc changed)
380
	if(m != NULL)
381
		mm->monDestroyMeasure(m->mh_local);
382

  
383
	mh = mm->monCreateMeasureId(initmeasure->mid, initmeasure->mc);
360 384

  
361 385
	if(mh < 0)
362 386
		return remoteMeasureResponseTx(src, initmeasure->mh_local, -1, INITREMOTEMEASURE, EINVAL);
......
370 394
			goto error;
371 395
	}
372 396

  
373
	if(m == NULL) {
374
		if(activateMeasure(mm->mMeasureInstances[mh], src, initmeasure->msg_type) != EOK)
375
			goto error;
376
	} else
377
		m->defaultInit();
397
	if(activateMeasure(mm->mMeasureInstances[mh], src, initmeasure->msg_type) != EOK)
398
		goto error;
378 399

  
379 400
	return remoteMeasureResponseTx(src, initmeasure->mh_local, mh, INITREMOTEMEASURE, EOK);
380 401

  
......
424 445

  
425 446
	buffer.insert(buffer.end(),(char*)&deinitmeasure, ((char*)&deinitmeasure) + DEINITREMOTEMEASURE_SIZE);
426 447

  
427
	return sendCtrlMsg(m->dst_socketid, buffer);
448
	return sendCtrlMsg((SocketId) m->dst_socketid, buffer);
428 449
};
429 450

  
430 451
int MeasureDispatcher::deinitRemoteMeasureRx(SocketId src, MsgType mt, char* cbuf) {
......
447 468
	return remoteMeasureResponseTx(src,  deinitmeasure->mh_local, deinitmeasure->mh_remote, DEINITREMOTEMEASURE, EOK);
448 469
}
449 470

  
450
class MonMeasure* MeasureDispatcher::findDep(DispatcherListMsgType &dl, MeasurementCapabilities flags, MsgType mt, MeasurementId mid) {
451
	ExecutionList *el;
452

  
453
	if(dl.find(mt) == dl.end()) {
454
		return NULL;
455
	}
471
class MonMeasure* MeasureDispatcher::findMeasureFromId(DestinationSocketIdMtData *dd,  MeasurementCapabilities flags, MeasurementId mid) {
472
	ExecutionList::iterator it;
456 473

  
474
	//check if loaded
457 475
	if(flags & REMOTE) {
458
		el = &(dl[mt]->el_remote);
476
		it = dd->mids_remote.find(mid);
477
		if(it == dd->mids_remote.end())
478
			return NULL;
459 479
	} else {
460
		el = &(dl[mt]->el_local);
480
		it = dd->mids_local.find(mid);
481
		if(it == dd->mids_local.end())
482
			return NULL;
461 483
	}
462 484

  
463
	/* Check if this measure is already in the list */
464
	if(el->find(mid) != el->end())
465
		return (*el)[mid];
466

  
467
	return NULL;
485
	return it->second;
468 486
}
469 487

  
470
class MonMeasure* MeasureDispatcher::findMeasureFromId(DestinationSocketIdData *dsid,  MeasurementCapabilities flags, MeasurementId mid, MsgType mt) {
471
	if(flags & IN_BAND) {
472
		if(flags & TXLOC) {
473
			if(flags & PACKET)
474
				return findDep(dsid->tx_pkt, flags, mt, mid);
475
			if(flags & DATA)
476
				return findDep(dsid->tx_data, flags, mt, mid);
477
		}
478

  
479
		if(flags & RXLOC) {
480
			if(flags & PACKET)
481
				return findDep(dsid->rx_pkt, flags, mt, mid);
482
			if(flags & DATA)
483
				return findDep(dsid->rx_data, flags, mt, mid);
484
		}
485
	}
486

  
487
	return NULL;
488
}
489

  
490
int MeasureDispatcher::checkDeps(DestinationSocketIdData *dsid, MeasurementCapabilities flags, MeasurementId mid, MsgType mt) {
491
	int ret = 1;
492

  
493
	if(flags & IN_BAND) {
494
		if(flags & TXLOC) {
495
			if(flags & PACKET)
496
				ret &= (findDep(dsid->tx_pkt, flags, mt, mid) != NULL);
497
			if(flags & DATA)
498
				ret &= (findDep(dsid->tx_data, flags, mt, mid) != NULL);
499
		}
500

  
501
		if(flags & RXLOC) {
502
			if(flags & PACKET)
503
				ret &= (findDep(dsid->rx_pkt, flags, mt, mid) != NULL);
504
			if(flags & DATA)
505
				ret &= (findDep(dsid->rx_data, flags, mt, mid) != NULL);
506
		}
507
	}
508

  
509
	return ret;
510
}
511

  
512

  
513 488
int MeasureDispatcher::activateMeasure(class MonMeasure *m, SocketId dst, MsgType mt, int auto_load) {
489
	MeasurementId mid;
490
	struct SocketIdMt h_dst;
514 491
	int ret;
515
	m->status = INITIALISING;
492

  
493
	h_dst.sid = dst;
494
	h_dst.mt = mt;
495

  
496
	mid = m->measure_plugin->getId();
516 497
	m->msg_type = mt;
517 498

  
518 499
	if(dst != NULL) {
519
		if(m->dst_socketid == NULL)
520
			m->dst_socketid = (SocketId) new char[SOCKETID_SIZE];
521

  
522
		memcpy((char*) m->dst_socketid, (char*) dst, SOCKETID_SIZE);
500
		memcpy(m->dst_socketid, (uint8_t *) dst, SOCKETID_SIZE);
523 501
	} else {
524 502
		if(m->flags != 0)
525 503
			return -EINVAL;
......
528 506
	m->defaultInit();
529 507

  
530 508
	if(m->flags == 0) {
531
		m->status = RUNNING;
532 509
		return EOK;
533 510
	}
534 511

  
535
	if(dispatcherList.find(dst) == dispatcherList.end()) {
536
		char *c_sid = new char[SOCKETID_SIZE];
537
		memcpy(c_sid, (char*) dst, SOCKETID_SIZE);
538
		dst = (SocketId) c_sid;
539
		dispatcherList[dst] = new DestinationSocketIdData;
540
		dispatcherList[dst]->sid = c_sid;
512
	//check if already present
513
	if(dispatcherList.find(h_dst) != dispatcherList.end()) {
514
		if(m->flags & REMOTE) {
515
			if(dispatcherList[h_dst]->mids_remote.find(mid) == dispatcherList[h_dst]->mids_remote.end())
516
				return -EEXISTS;
517
		} else {
518
			if(dispatcherList[h_dst]->mids_local.find(mid) == dispatcherList[h_dst]->mids_local.end())
519
				return -EEXISTS;
520
		}
521
	}
522

  
523

  
524
	if(dispatcherList.find(h_dst) == dispatcherList.end()) {
525
		createDestinationSocketIdMtData(h_dst);
526
	}
527

  
528
	if(m->flags & OUT_OF_BAND) {
529
		struct SocketIdMt h_dst2 = h_dst;
530
		h_dst2.mt = MSG_TYPE_MONL;
531
		if(dispatcherList.find(h_dst2) == dispatcherList.end())
532
			createDestinationSocketIdMtData(h_dst2);
541 533
	}
542 534

  
543 535
	if(m->flags & PACKET) {
544
		if(dispatcherList[dst]->r_pkt.find(mt) == dispatcherList[dst]->r_pkt.end()) {
545
			dispatcherList[dst]->r_pkt[mt] = new struct pkt_result;
546
			for(int i = 0; i < R_LAST_PKT; i++) {
547
				dispatcherList[dst]->r_pkt[mt]->r_rx_remote[i] = NAN;
548
				dispatcherList[dst]->r_pkt[mt]->r_tx_remote[i] = NAN;
549
				dispatcherList[dst]->r_pkt[mt]->r_rx_local[i] = NAN;
550
				dispatcherList[dst]->r_pkt[mt]->r_tx_local[i] = NAN;
551
			}
552
			dispatcherList[dst]->r_pkt[mt]->tx_seq_num = 0;
553
		}
554
		if(m->flags & OUT_OF_BAND) {
555
			if(dispatcherList[dst]->r_pkt.find(MSG_TYPE_MONL) == dispatcherList[dst]->r_pkt.end()) {
556
				dispatcherList[dst]->r_pkt[MSG_TYPE_MONL] = new struct pkt_result;
557
				for(int i = 0; i < R_LAST_PKT; i++) {
558
					dispatcherList[dst]->r_pkt[MSG_TYPE_MONL]->r_rx_remote[i] = NAN;
559
					dispatcherList[dst]->r_pkt[MSG_TYPE_MONL]->r_tx_remote[i] = NAN;
560
					dispatcherList[dst]->r_pkt[MSG_TYPE_MONL]->r_rx_local[i] = NAN;
561
					dispatcherList[dst]->r_pkt[MSG_TYPE_MONL]->r_tx_local[i] = NAN;
562
				}
563
				dispatcherList[dst]->r_pkt[MSG_TYPE_MONL]->tx_seq_num = 0;
564
			}
565
		}
566 536
		if(m->flags & REMOTE) {
567
			m->r_rx_list = dispatcherList[dst]->r_pkt[mt]->r_rx_remote;
568
			m->r_tx_list = dispatcherList[dst]->r_pkt[mt]->r_tx_remote;
537
			m->r_rx_list = dispatcherList[h_dst]->pkt_r_rx_remote;
538
			m->r_tx_list = dispatcherList[h_dst]->pkt_r_tx_remote;
569 539
		} else {
570
			m->r_rx_list = dispatcherList[dst]->r_pkt[mt]->r_rx_local;
571
			m->r_tx_list = dispatcherList[dst]->r_pkt[mt]->r_tx_local;
572
		}
573
	}
574
	if(m->flags & DATA) {
575
		if(dispatcherList[dst]->r_data.find(mt) == dispatcherList[dst]->r_data.end()) {
576
			dispatcherList[dst]->r_data[mt] = new struct data_result;
577
			for(int i = 0; i < R_LAST_DATA; i++) {
578
				dispatcherList[dst]->r_data[mt]->r_rx_remote[i] = NAN;
579
				dispatcherList[dst]->r_data[mt]->r_tx_remote[i] = NAN;
580
				dispatcherList[dst]->r_data[mt]->r_rx_local[i] = NAN;
581
				dispatcherList[dst]->r_data[mt]->r_tx_local[i] = NAN;
582
			}
583
			dispatcherList[dst]->r_data[mt]->tx_seq_num = 0;
584
		}
585
		if(m->flags & OUT_OF_BAND) {
586
			if(dispatcherList[dst]->r_data.find(MSG_TYPE_MONL) == dispatcherList[dst]->r_data.end()) {
587
				dispatcherList[dst]->r_data[MSG_TYPE_MONL] = new struct data_result;
588
				for(int i = 0; i < R_LAST_DATA; i++) {
589
					dispatcherList[dst]->r_data[MSG_TYPE_MONL]->r_rx_remote[i] = NAN;
590
					dispatcherList[dst]->r_data[MSG_TYPE_MONL]->r_tx_remote[i] = NAN;
591
					dispatcherList[dst]->r_data[MSG_TYPE_MONL]->r_rx_local[i] = NAN;
592
					dispatcherList[dst]->r_data[MSG_TYPE_MONL]->r_tx_local[i] = NAN;
593
				}
594
				dispatcherList[dst]->r_data[MSG_TYPE_MONL]->tx_seq_num = 0;
595
			}
540
			m->r_rx_list = dispatcherList[h_dst]->pkt_r_rx_local;
541
			m->r_tx_list = dispatcherList[h_dst]->pkt_r_tx_local;
596 542
		}
543
	} else {
597 544
		if(m->flags & REMOTE) {
598
			m->r_rx_list = dispatcherList[dst]->r_data[mt]->r_rx_remote;
599
			m->r_tx_list = dispatcherList[dst]->r_data[mt]->r_tx_remote;
545
			m->r_rx_list = dispatcherList[h_dst]->data_r_rx_remote;
546
			m->r_tx_list = dispatcherList[h_dst]->data_r_tx_remote;
600 547
		} else {
601
			m->r_rx_list = dispatcherList[dst]->r_data[mt]->r_rx_local;
602
			m->r_tx_list = dispatcherList[dst]->r_data[mt]->r_tx_local;
548
			m->r_rx_list = dispatcherList[h_dst]->data_r_rx_local;
549
			m->r_tx_list = dispatcherList[h_dst]->data_r_tx_local;
603 550
		}
604 551
	}
552
	//TODO: check deps
605 553

  
606
	//try to automatically load dependencies
607
	if(auto_load) {
608
		class MeasurePlugin *mp = m->measure_plugin;
609
		/* Check for all dependencies */
610
		std::vector<MeasurementId>::iterator it;
611
		std::vector<MeasurementId> deps = mp->getDeps();
612
		for(it = deps.begin(); it != deps.end(); it++) {
613
			if(checkDeps(dispatcherList[dst], m->flags, *it, mt) == 0) {
614
				MonHandler mh;
615
				mh = mm->monCreateMeasureId(*it, m->flags);
616
				if(mh > 0){
617
					if(activateMeasure(mm->mMeasureInstances[mh], dst, mt) == EOK)
618
						mm->mMeasureInstances[mh]->auto_loaded = true;
619
					else {
620
						mm->monDestroyMeasure(mh);
621
						ret = -EUNRESOLVEDDEP;
622
						goto error;
623
					}
624
				} else
625
					ret = -EUNRESOLVEDDEP;
626
					goto error;
627
			}
628
		}
629
	}
554
	// add it to loaded measure list
555
	if(m->flags & REMOTE)
556
		dispatcherList[h_dst]->mids_remote[mid] = m;
557
	else
558
		dispatcherList[h_dst]->mids_local[mid] = m;
630 559

  
631
	/* IN_BAND  measurmente added to hook executionlists */
632
	if(m->flags & IN_BAND) {
633
		/* TXLOC */
634
		if(m->flags & TXLOC) {
635
			if(m->flags & PACKET) {
636
				ret = addMeasureToExecList(dispatcherList[dst]->tx_pkt,m,mt);
637
				if(ret != EOK)
638
					goto error;
639
			}
640
			if(m->flags & DATA) {
641
				ret = addMeasureToExecList(dispatcherList[dst]->tx_data,m,mt);
642
				if(ret != EOK)
643
					goto error;
644
			}
645
		}
646
		/* RXLOC */
647
		if(m->flags & RXLOC) {
648
			if(m->flags & PACKET) {
649
				ret = addMeasureToExecList(dispatcherList[dst]->rx_pkt,m,mt);
650
				if(ret != EOK)
651
					goto error;
652
			}
653
			if(m->flags & DATA) {
654
				ret = addMeasureToExecList(dispatcherList[dst]->rx_data,m,mt);
655
				if(ret != EOK)
656
					goto error;
657
			}
658
		}
659
		/* Remote counterparts ? */
660
		if(m->flags & TXREM || m->flags & RXREM) {
661
			/* Yes, initialise them */
662
			ret = initRemoteMeasureTx(m,dst,mt);
663
			if(ret != EOK)
664
				goto error;
665
			else
666
				return ret;
667
		}
668
	}
560
	//Handle IN_BAND measures
561
	addMeasureToExecLists(h_dst, m);
669 562

  
563
	//Handle OUT_OF_BAND measures
670 564
	if(m->flags & OUT_OF_BAND) {
671 565
		struct timeval tv = {0,0};
672
		/* TXLOC need only a local instance */
673
		if(m->flags & TXLOC) {
674
			ret = m->scheduleNextIn(&tv);
675
			if(ret != EOK)
676
				goto error;
677
		}
566
		//if only local
567
		if(!(m->flags & TXREM) && !(m->flags & RXREM))
568
			m->scheduleNextIn(&tv); //start it!
569
	}
678 570

  
679
		/* Remote counterparts ? */
680
		if(m->flags & TXREM || m->flags & RXREM) {
681
			/* Yes, initialise them */
682
			ret = initRemoteMeasureTx(m,dst,mt);
683
			if(ret != EOK)
684
				goto error;
685
			else
686
				return ret;
687
		} else {
688
			/* No, we are done */
689
			ret = m->scheduleNextIn(&tv);
690
			if(ret != EOK)
691
				goto error;
692
		}
571
	/* Remote counterparts ? */
572
	if(m->flags & TXREM || m->flags & RXREM) {
573
		/* Yes, initialise them */
574
		m->status = INITIALISING;
575
		ret = initRemoteMeasureTx(m,dst,mt);
576
		if(ret != EOK)
577
			goto error;
578
	} else {
579
		m->status = RUNNING;
693 580
	}
694 581

  
695
	m->status = RUNNING;
582
	if(m->dst_socketid == NULL)
583
		info("bla1");
696 584
	return EOK;
697 585

  
698 586
error:
699
	m->status = FAILED;
700
	m->r_tx_list = m->r_rx_list = NULL;
587
	if(m->dst_socketid == NULL)
588
		info("bla2");
589

  
590
	stopMeasure(m);
591
	if(m->dst_socketid == NULL)
592
		info("bla3");
593

  
701 594
	return ret;
702
};
595
}
703 596

  
704
int MeasureDispatcher::deactivateMeasure(class MonMeasure *m) {
705
	int ret;
706
	SocketId dst = m->dst_socketid;
597
void MeasureDispatcher::createDestinationSocketIdMtData(struct SocketIdMt h_dst) {
598
	DestinationSocketIdMtData *dd;
707 599
	
708
	class MeasurePlugin *mp = m->measure_plugin;
709
	std::vector<MeasurementId>::iterator it;
710
	std::vector<MeasurementId> deps = mp->getDeps();
600
	dd = new DestinationSocketIdMtData;
601

  
602
	memcpy(dd->sid, (uint8_t*) h_dst.sid, SOCKETID_SIZE);
603
	dd->h_dst.sid = (SocketId) &(dd->sid);
604
	dd->h_dst.mt = h_dst.mt;
605

  
606
	dispatcherList[dd->h_dst] = dd;
607

  
608
	//initialize
609
	for(int i = 0; i < R_LAST_PKT; i++) {
610
		dispatcherList[dd->h_dst]->pkt_r_rx_local[i] = NAN;
611
		dispatcherList[dd->h_dst]->pkt_r_rx_remote[i] = NAN;
612
		dispatcherList[dd->h_dst]->pkt_r_tx_local[i] = NAN;
613
		dispatcherList[dd->h_dst]->pkt_r_tx_remote[i] = NAN;
614
	}
615
	dispatcherList[dd->h_dst]->pkt_tx_seq_num = 0;
616

  
617
	for(int i = 0; i < R_LAST_DATA; i++) {
618
		dispatcherList[dd->h_dst]->data_r_rx_local[i] = NAN;
619
		dispatcherList[dd->h_dst]->data_r_rx_remote[i] = NAN;
620
		dispatcherList[dd->h_dst]->data_r_tx_local[i] = NAN;
621
		dispatcherList[dd->h_dst]->data_r_tx_remote[i] = NAN;
622
	}
623
	dispatcherList[dd->h_dst]->data_tx_seq_num = 0;
624
}
625

  
626
void MeasureDispatcher::destroyDestinationSocketIdMtData(SocketId dst, MsgType mt) {
627
	DestinationSocketIdMtData *dd;
628
	DispatcherListSocketIdMt::iterator it;
629
	struct SocketIdMt h_dst;
630

  
631
	h_dst.sid = dst;
632
	h_dst.mt = mt;
633

  
634
	it = dispatcherList.find(h_dst);
635
	if(it != dispatcherList.end()) {
636
		dd = it->second;
637
		dispatcherList.erase(it);
638
		delete dd;
639
	}
640
	else
641
		fatal("Trying to erase non existent DestinationSocketIdMtData");
642
}
643

  
644
int MeasureDispatcher::deactivateMeasure(class MonMeasure *m) {
645
	SocketIdMt h_dst;
646

  
647
	h_dst.sid = (SocketId) m->dst_socketid;
648
	h_dst.mt = m->msg_type;
711 649

  
712 650
	if(m->status == STOPPED)
713 651
		return EOK;
714
	
652

  
715 653
	if(m->used_counter > 0)
716 654
		return -EINUSE;
717 655

  
718
	if(m->status != FAILED)
656
	if(m->flags == 0) {
719 657
		m->defaultStop();
658
		m->status = STOPPED;
659
		return EOK;
660
	}
720 661

  
721
	m->status = STOPPED;
722
	m->r_tx_list = m->r_rx_list = NULL;
723

  
724
	/* IN_BAND  measurments added to hook executionlists*/
725
	if(m->flags & IN_BAND) {
726
		/* TXLOC */
727
		if(m->flags & TXLOC) {
728
			if(m->flags & PACKET) {
729
				ret = delMeasureFromExecList(dispatcherList[dst]->tx_pkt,m,m->msg_type);
730
				if(ret != EOK)
731
					goto error;
732
			}
733
			if(m->flags & DATA) {
734
				ret = delMeasureFromExecList(dispatcherList[dst]->tx_data,m,m->msg_type);
735
				if(ret != EOK)
736
					goto error;
737
			}
738
		}
739

  
740
		/* RXLOC */
741
		if(m->flags & RXLOC) {
742
			if(m->flags & PACKET) {
743
				ret = delMeasureFromExecList(dispatcherList[dst]->rx_pkt,m,m->msg_type);
744
				if(ret != EOK)
745
					goto error;
746
			}
747
			if(m->flags & DATA) {
748
				ret = delMeasureFromExecList(dispatcherList[dst]->rx_data,m,m->msg_type);
749
				if(ret != EOK)
750
					goto error;
751
			}
752
		}
662
	if(dispatcherList.find(h_dst) == dispatcherList.end())
663
		return EINVAL;
753 664

  
754
		/* Remote counterparts ? */
755
		if(m->flags & TXREM) {
756
				m->status = STOPPING;
757
				ret = deinitRemoteMeasureTx(m);
758
				if(ret != EOK)
759
						goto error;
760
		}
761
	}
762
	//TODO out of band measures check this!!!
763
	if(m->flags & OUT_OF_BAND) {
764
		m->status = STOPPING;
765
		if(m->flags & TXREM) {
766
				ret = deinitRemoteMeasureTx(m);
767
				if(ret != EOK)
768
						goto error;
769
		}
665
	//remove it from the loaded list
666
	if(m->flags & REMOTE) {
667
		if(dispatcherList[h_dst]->mids_remote.find(m->measure_plugin->getId()) == dispatcherList[h_dst]->mids_remote.end())
668
			return -EINVAL;
669
	} else {
670
		if(dispatcherList[h_dst]->mids_local.find(m->measure_plugin->getId()) == dispatcherList[h_dst]->mids_local.end())
671
			return -EINVAL;
770 672
	}
771 673

  
772
	//remove automatically loaded ones
773
	for(it = deps.begin(); it != deps.end(); it++) {
774
		class MonMeasure *md = findMeasureFromId(dispatcherList[dst], m->flags, *it, m->msg_type);
775
		if(md != NULL) {
776
			if(md->used_counter == 0 && md->auto_loaded == true) {
777
				ret = deactivateMeasure(md);
778
				if(ret != EOK)
779
						goto error;
780
				if(md->status == STOPPED) {
781
					ret = mm->monDestroyMeasure(md->mh_local);
782
					if(ret != EOK)
783
							goto error;
784
				}
785
			}
786
		}
787
	}
674
	return stopMeasure(m);
675
}
788 676

  
789
	if(dispatcherList[dst]->rx_pkt.find(m->msg_type) == dispatcherList[dst]->rx_pkt.end() &&
790
		dispatcherList[dst]->tx_pkt.find(m->msg_type) == dispatcherList[dst]->tx_pkt.end()) {
791
			delete dispatcherList[dst]->r_pkt[m->msg_type];
792
			dispatcherList[dst]->r_pkt.erase(m->msg_type);
793
	}
677
int MeasureDispatcher::stopMeasure(class MonMeasure *m) {
678
	SocketIdMt h_dst;
794 679

  
795
	if(dispatcherList[dst]->rx_data.find(m->msg_type) == dispatcherList[dst]->rx_data.end() &&
796
		dispatcherList[dst]->tx_data.find(m->msg_type) == dispatcherList[dst]->tx_data.end()) {
797
			delete dispatcherList[dst]->r_data[m->msg_type];
798
			dispatcherList[dst]->r_data.erase(m->msg_type);
799
	}
680
	h_dst.sid = (SocketId) m->dst_socketid;
681
	h_dst.mt = m->msg_type;
800 682

  
801
	if(dispatcherList[dst]->rx_pkt.empty() && dispatcherList[dst]->tx_pkt.empty() &&
802
			dispatcherList[dst]->rx_data.empty() && dispatcherList[dst]->tx_data.empty()) {
803
		ResultPktListMsgType::iterator it;
804
		for(it = dispatcherList[dst]->r_pkt.begin(); it != dispatcherList[dst]->r_pkt.end(); it++)
805
			delete it->second;
806
		ResultDataListMsgType::iterator it2;
807
		for(it2 = dispatcherList[dst]->r_data.begin(); it2 != dispatcherList[dst]->r_data.end(); it2++)
808
			delete it2->second;
809
		delete[] dispatcherList[dst]->sid;
810
		delete dispatcherList[dst];
811
	}
683
	m->defaultStop();
812 684

  
813
	if(m->dst_socketid) {
814
		delete[] (char *) m->dst_socketid;
815
		m->dst_socketid = NULL;
816
	}
817
	return EOK;
685
	//Handle IN_BAND measures
686
	delMeasureFromExecLists(m);
687
	//Handle OUT_OF:BAND measures
688
	//TODO: Anything to do at all?
818 689

  
819
error:
820
	m->status = FAILED;
690
	//Handle remote counterpart
691
	if(m->flags & TXREM || m->flags & RXREM) {
692
			m->status = STOPPING;
693
			deinitRemoteMeasureTx(m);
694
	} else
695
		m->status = STOPPED;
696
	m->r_tx_list = m->r_rx_list = NULL;
821 697

  
822
};
698
	//remove it from the loaded list
699
	if(m->flags & REMOTE)
700
		dispatcherList[h_dst]->mids_remote.erase(dispatcherList[h_dst]->mids_remote.find(m->measure_plugin->getId()));
701
	else
702
		dispatcherList[h_dst]->mids_local.erase(dispatcherList[h_dst]->mids_local.find(m->measure_plugin->getId()));
823 703

  
704
	//check if we can remove also the DistinationSocektIdMtData
705
	if(dispatcherList[h_dst]->mids_remote.empty() && dispatcherList[h_dst]->mids_local.empty())
706
		destroyDestinationSocketIdMtData((SocketId) m->dst_socketid, m->msg_type);
707
}
824 708

  
825 709

  
826 710
void MeasureDispatcher::cbRxPkt(void *arg) {
......
830 714
	int i,j;
831 715
	mon_pkt_inf *pkt_info = (mon_pkt_inf *)arg;
832 716
	struct MonPacketHeader *mph = (MonPacketHeader*) pkt_info->monitoringHeader;
833
	SocketId sid = pkt_info->remote_socketID;
834
	MsgType mt = pkt_info->msgtype;
717
	struct SocketIdMt h_dst;
718
	h_dst.sid = pkt_info->remote_socketID;
719
	h_dst.mt = pkt_info->msgtype;
835 720

  
836
	/* something to do? */
837
	if(dispatcherList.size() == 0)
721
	if(pkt_info->remote_socketID == NULL)
838 722
		return;
839 723

  
840
	if(dispatcherList.find(sid) == dispatcherList.end())
724
	/* something to do? */
725
	if(dispatcherList.size() == 0)
841 726
		return;
842 727

  
843
	if(dispatcherList[sid]->r_pkt.find(mt) == dispatcherList[sid]->r_pkt.end()) {
728
	if(dispatcherList.find(h_dst) == dispatcherList.end())
844 729
		return;
845
	}
846 730

  
847 731
	//we have a result vector to fill
848
	r_loc = dispatcherList[sid]->r_pkt[mt]->r_rx_local;
849
	r_rem = dispatcherList[sid]->r_pkt[mt]->r_rx_remote;
732
	r_loc = dispatcherList[h_dst]->pkt_r_rx_local;
733
	r_rem = dispatcherList[h_dst]->pkt_r_rx_remote;
850 734

  
851 735
	//TODO: add standard fields
852 736
	if(mph != NULL) {
......
866 750
	r_rem[R_RECEIVE_TIME] = r_loc[R_RECEIVE_TIME] += pkt_info->arrival_time.tv_sec;
867 751

  
868 752

  
869
	// are there in band measures?
870
	if (dispatcherList[sid]->rx_pkt.size() == 0)
871
		return;
872

  
873
	if(dispatcherList[sid]->rx_pkt.find(mt) == dispatcherList[sid]->rx_pkt.end())
874
		return;
875

  
876
	/* yes! */
877
	/* Locals first */
878
	ExecutionList *el_ptr_loc = &(dispatcherList[sid]->rx_pkt[mt]->el_local);
879

  
880
	/* Call measures in order */
881
	for( it = el_ptr_loc->begin(); it != el_ptr_loc->end(); it++)
753
	// are there local in band measures?
754
	if (dispatcherList[h_dst]->el_rx_pkt_local.size() > 0) {
755
		/* yes! */
756
		ExecutionList *el_ptr_loc = &(dispatcherList[h_dst]->el_rx_pkt_local);
757
	
758
		/* Call measures in order */
759
		for( it = el_ptr_loc->begin(); it != el_ptr_loc->end(); it++)
882 760
			if(it->second->status == RUNNING)
883
					it->second->RxPktLocal(el_ptr_loc);
884

  
885
	/* And remotes */
886
	ExecutionList *el_ptr_rem = &(dispatcherList[sid]->rx_pkt[mt]->el_remote);
887

  
888
	/* Call measurees in order */
889
	j = 0;
890
	for( it = el_ptr_rem->begin(); it != el_ptr_rem->end(); it++)
891
		if(it->second->status == RUNNING)
892
			it->second->RxPktRemote(rmp,j,el_ptr_rem);
761
				it->second->RxPktLocal(el_ptr_loc);
762
	}
893 763

  
894
	/* send back results */
895
	if(j > 0)
896
		remoteResultsTx(sid, rmp, j, mt);
764
	if (dispatcherList[h_dst]->el_rx_pkt_remote.size() > 0) {
765
		/* And remotes */
766
		ExecutionList *el_ptr_rem = &(dispatcherList[h_dst]->el_rx_pkt_remote);
767
	
768
		/* Call measurees in order */
769
		j = 0;
770
		for( it = el_ptr_rem->begin(); it != el_ptr_rem->end(); it++)
771
			if(it->second->status == RUNNING)
772
				it->second->RxPktRemote(rmp,j,el_ptr_rem);
773
	
774
		/* send back results */
775
		if(j > 0)
776
		remoteResultsTx(h_dst.sid, rmp, j, h_dst.mt);
777
	}
897 778
}
898 779

  
899 780
void MeasureDispatcher::cbRxData(void *arg) {
......
903 784
	int i,j;
904 785
	mon_data_inf *data_info = (mon_data_inf *)arg;
905 786
	struct MonDataHeader *mdh = (MonDataHeader*) data_info->monitoringDataHeader;
906
	SocketId sid = data_info->remote_socketID;
907
	MsgType mt = data_info->msgtype;
787
	struct SocketIdMt h_dst;
788
	h_dst.sid = data_info->remote_socketID;
789
	h_dst.mt = data_info->msgtype;
908 790

  
909
	/* something to do? */
910
	if(dispatcherList.size() == 0)
791
	if(data_info->remote_socketID == NULL)
911 792
		return;
912 793

  
913
	if(dispatcherList.find(sid) == dispatcherList.end())
794
	/* something to do? */
795
	if(dispatcherList.size() == 0)
914 796
		return;
915 797

  
916
	if(dispatcherList[sid]->r_data.find(mt) == dispatcherList[sid]->r_data.end()) {
798
	if(dispatcherList.find(h_dst) == dispatcherList.end())
917 799
		return;
918
	}
919 800

  
920
	r_loc = dispatcherList[sid]->r_data[mt]->r_rx_local;
921
	r_rem = dispatcherList[sid]->r_data[mt]->r_rx_remote;
801
	//we have a result vector to fill
802
	r_loc = dispatcherList[h_dst]->data_r_rx_local;
803
	r_rem = dispatcherList[h_dst]->data_r_rx_remote;
922 804

  
923 805
	//TODO: add standard fields
924 806
	if(mdh != NULL) {
......
932 814
	r_rem[R_RECEIVE_TIME] = r_loc[R_RECEIVE_TIME] = data_info->arrival_time.tv_usec / 1000000.0;
933 815
	r_rem[R_RECEIVE_TIME] = r_loc[R_RECEIVE_TIME] += data_info->arrival_time.tv_sec;
934 816

  
935
	if (dispatcherList[sid]->rx_data.size() == 0)
936
		return;
937

  
938
	if(dispatcherList[sid]->rx_data.find(mt) == dispatcherList[sid]->rx_data.end())
939
		return;
940

  
941
	/* yes! */
942
	/* Locals first */
943
	ExecutionList *el_ptr_loc = &(dispatcherList[sid]->rx_data[mt]->el_local);
944

  
945

  
946
	/* Call measurees in order */
947
	for( it = el_ptr_loc->begin(); it != el_ptr_loc->end(); it++){
948
		MonMeasure *m = it->second;
949
		if(it->second->status == RUNNING)
950
			it->second->RxDataLocal(el_ptr_loc);
817
	if (dispatcherList[h_dst]->el_rx_data_local.size() > 0) {
818
		/* yes! */
819
		/* Locals first */
820
		ExecutionList *el_ptr_loc = &(dispatcherList[h_dst]->el_rx_data_local);
821
	
822
	
823
		/* Call measurees in order */
824
		for( it = el_ptr_loc->begin(); it != el_ptr_loc->end(); it++){
825
			MonMeasure *m = it->second;
826
			if(it->second->status == RUNNING)
827
				it->second->RxDataLocal(el_ptr_loc);
828
		}
951 829
	}
952 830

  
953
	/* And remotes */
954

  
955
	ExecutionList *el_ptr_rem = &(dispatcherList[sid]->rx_data[mt]->el_remote);
956

  
957
	/* Call measurees in order */
958
	j = 0;
959
	for( it = el_ptr_rem->begin(); it != el_ptr_rem->end(); it++)
960
		if(it->second->status == RUNNING)
961
			it->second->RxDataRemote(rmp,j,el_ptr_rem);
962

  
963
	/* send back results */
964
	if(j > 0)
965
		remoteResultsTx(sid, rmp, j, mt);
831
	if (dispatcherList[h_dst]->el_rx_data_remote.size() > 0) {
832
		/* And remotes */
833
	
834
		ExecutionList *el_ptr_rem = &(dispatcherList[h_dst]->el_rx_data_remote);
835
	
836
		/* Call measurees in order */
837
		j = 0;
838
		for( it = el_ptr_rem->begin(); it != el_ptr_rem->end(); it++)
839
			if(it->second->status == RUNNING)
840
				it->second->RxDataRemote(rmp,j,el_ptr_rem);
841
	
842
		/* send back results */
843
		if(j > 0)
844
			remoteResultsTx(h_dst.sid, rmp, j, h_dst.mt);
845
	}
966 846
}
967 847

  
968 848
void MeasureDispatcher::cbTxPkt(void *arg) {
......
972 852
	int i,j;
973 853
	mon_pkt_inf *pkt_info = (mon_pkt_inf *)arg;
974 854
	struct MonPacketHeader *mph = (MonPacketHeader*) pkt_info->monitoringHeader;
975
	SocketId sid = pkt_info->remote_socketID;
976
	MsgType mt = pkt_info->msgtype;
977 855
	struct timeval ts;
856
	struct SocketIdMt h_dst;
857
	h_dst.sid = pkt_info->remote_socketID;
858
	h_dst.mt = pkt_info->msgtype;
978 859

  
979
	
980
	/* something to do? */
981
	if(dispatcherList.size() == 0)
860
	if(pkt_info->remote_socketID == NULL)
982 861
		return;
983 862

  
984
	if(dispatcherList.find(sid) == dispatcherList.end())
863
	/* something to do? */
864
	if(dispatcherList.size() == 0)
985 865
		return;
986 866

  
987
	if(dispatcherList[sid]->r_pkt.find(mt) == dispatcherList[sid]->r_pkt.end()) {
867
	if(dispatcherList.find(h_dst) == dispatcherList.end())
988 868
		return;
989
	}
990 869

  
991 870
	/* yes! */
992 871

  
993
	r_loc = dispatcherList[sid]->r_pkt[mt]->r_tx_local;
994
	r_rem = dispatcherList[sid]->r_pkt[mt]->r_tx_remote;
872
	r_loc = dispatcherList[h_dst]->pkt_r_tx_local;
873
	r_rem = dispatcherList[h_dst]->pkt_r_tx_remote;
995 874

  
996 875
	/* prepare initial result vector (based on header information) */
997 876
		
998
	r_rem[R_SEQNUM] = r_loc[R_SEQNUM] = ++dispatcherList[sid]->r_pkt[mt]->tx_seq_num;
877
	r_rem[R_SEQNUM] = r_loc[R_SEQNUM] = ++(dispatcherList[h_dst]->pkt_tx_seq_num);
999 878
	r_rem[R_SIZE] = r_loc[R_SIZE] = pkt_info->bufSize;
1000 879
	r_rem[R_INITIAL_TTL] = r_loc[R_INITIAL_TTL] = initial_ttl;
1001 880
	gettimeofday(&ts,NULL);	
1002 881
	r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] = ts.tv_usec / 1000000.0;
1003 882
	r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] =  r_loc[R_SEND_TIME] + ts.tv_sec;
1004 883

  
1005
	if(dispatcherList[sid]->tx_pkt.find(mt) != dispatcherList[sid]->tx_pkt.end()) {
884
	if(dispatcherList[h_dst]->el_tx_pkt_local.size() > 0) {
885

  
886
		ExecutionList *el_ptr_loc = &(dispatcherList[h_dst]->el_tx_pkt_local);
1006 887

  
1007
		ExecutionList *el_ptr_loc = &(dispatcherList[sid]->tx_pkt[mt]->el_local);
1008
	
1009 888
		/* Call measurees in order */
1010 889
		for( it = el_ptr_loc->begin(); it != el_ptr_loc->end(); it++)
1011 890
			if(it->second->status == RUNNING)
1012 891
				it->second->TxPktLocal(el_ptr_loc);
1013
	
892
	}
893

  
894
	if(dispatcherList[h_dst]->el_tx_pkt_remote.size() > 0) {
1014 895
		/* And remotes */
1015
		ExecutionList *el_ptr_rem = &(dispatcherList[sid]->tx_pkt[mt]->el_remote);
896
		ExecutionList *el_ptr_rem = &(dispatcherList[h_dst]->el_tx_pkt_remote);
1016 897
	
1017 898
		/* Call measurees in order */
1018 899
		j = 0;
......
1022 903
	
1023 904
		/* send back results */
1024 905
		if(j > 0)
1025
			remoteResultsTx(sid, rmp, j, mt);
906
			remoteResultsTx(h_dst.sid, rmp, j, h_dst.mt);
1026 907
	}
1027 908

  
1028 909
	if(mph != NULL) {
......
1047 928
	int i,j;
1048 929
	mon_data_inf *data_info = (mon_data_inf *)arg;
1049 930
	struct MonDataHeader *mdh = (MonDataHeader*) data_info->monitoringDataHeader;
1050
	SocketId sid = data_info->remote_socketID;
1051
	MsgType mt = data_info->msgtype;
1052 931
	struct timeval ts;
932
	struct SocketIdMt h_dst;
933
	h_dst.sid = data_info->remote_socketID;
934
	h_dst.mt = data_info->msgtype;
1053 935

  
1054
	/* something to do? */
1055
	if(dispatcherList.size() == 0)
936
	if(data_info->remote_socketID == NULL)
1056 937
		return;
1057 938

  
1058
	if(dispatcherList.find(sid) == dispatcherList.end())
939
	/* something to do? */
940
	if(dispatcherList.size() == 0)
1059 941
		return;
1060 942

  
1061
	if(dispatcherList[sid]->r_data.find(mt) == dispatcherList[sid]->r_data.end()) {
943
	if(dispatcherList.find(h_dst) == dispatcherList.end())
1062 944
		return;
1063
	}
1064

  
1065 945
	
1066 946
	/* yes! */
1067
	r_loc = dispatcherList[sid]->r_data[mt]->r_tx_local;
1068
	r_rem = dispatcherList[sid]->r_data[mt]->r_tx_remote;
947
	r_loc = dispatcherList[h_dst]->data_r_tx_local;
948
	r_rem = dispatcherList[h_dst]->data_r_tx_remote;
1069 949

  
1070 950
	
1071 951
	//TODO add fields
1072 952
	r_rem[R_SIZE] = r_loc[R_SIZE] = data_info->bufSize;
1073
	r_rem[R_SEQNUM] = r_loc[R_SEQNUM] = ++(dispatcherList[sid]->r_data[mt]->tx_seq_num);
953
	r_rem[R_SEQNUM] = r_loc[R_SEQNUM] = ++(dispatcherList[h_dst]->data_tx_seq_num);
1074 954
	gettimeofday(&ts,NULL);
1075 955
	r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] = ts.tv_usec / 1000000.0;
1076 956
	r_rem[R_SEND_TIME] = r_loc[R_SEND_TIME] = r_loc[R_SEND_TIME] + ts.tv_sec;
1077 957

  
1078

  
1079
	if(dispatcherList[sid]->tx_data.find(mt) != dispatcherList[sid]->tx_data.end()) {
1080
		ExecutionList *el_ptr_loc = &(dispatcherList[sid]->tx_data[mt]->el_local);
1081
		
958
	if(dispatcherList[h_dst]->el_tx_data_local.size() > 0) {
959
		ExecutionList *el_ptr_loc = &(dispatcherList[h_dst]->el_tx_data_local);
1082 960

  
1083 961
		/* Call measures in order */
1084 962
		for( it = el_ptr_loc->begin(); it != el_ptr_loc->end(); it++)
1085 963
			if(it->second->status == RUNNING)
1086 964
				it->second->TxDataLocal(el_ptr_loc);
1087
	
965
	}
966

  
967
	if(dispatcherList[h_dst]->el_tx_data_remote.size() > 0) {
1088 968
		/* And remote */
1089
		ExecutionList *el_ptr_rem = &(dispatcherList[sid]->tx_data[mt]->el_remote);
969
		ExecutionList *el_ptr_rem = &(dispatcherList[h_dst]->el_tx_data_remote);
1090 970
	
1091 971
		/* Call measures in order */
1092 972
		j = 0;
......
1096 976
	
1097 977
		/* send back results */
1098 978
		if(j > 0)
1099
			remoteResultsTx(sid, rmp, j, mt);
979
			remoteResultsTx(h_dst.sid, rmp, j, h_dst.mt);
1100 980
	}
1101 981

  
1102 982
	if(mdh != NULL) {
......
1114 994
}
1115 995

  
1116 996
int MeasureDispatcher::cbHdrPkt(SocketId sid, MsgType mt) {
1117
	/* something to do? */
1118
	if(dispatcherList.size() == 0)
997
	struct SocketIdMt h_dst;
998
	h_dst.sid = sid;
999
	h_dst.mt = mt;
1000

  
1001
	if(sid == NULL)
1119 1002
		return 0;
1120 1003

  
1121
	if(dispatcherList.find(sid) == dispatcherList.end())
1004
	/* something to do? */
1005
	if(dispatcherList.size() == 0)
1122 1006
		return 0;
1123 1007

  
1124
	if(dispatcherList[sid]->tx_pkt.find(mt) == dispatcherList[sid]->tx_pkt.end() &&
1125
		dispatcherList[sid]->r_pkt.find(mt) == dispatcherList[sid]->r_pkt.end())
1008
	if(dispatcherList.find(h_dst) == dispatcherList.end())
1126 1009
		return 0;
1127 1010

  
1128 1011
	/* yes! */
......
1130 1013
}
1131 1014

  
1132 1015
int MeasureDispatcher::cbHdrData(SocketId sid, MsgType mt) {
1133
	/* something to do? */
1134
	if(dispatcherList.size() == 0)
1016
	struct SocketIdMt h_dst;
1017
	h_dst.sid = sid;
1018
	h_dst.mt = mt;
1019

  
1020
	if(sid == NULL)
1135 1021
		return 0;
1136 1022

  
1137
	if(dispatcherList.find(sid) == dispatcherList.end())
1023
	/* something to do? */
1024
	if(dispatcherList.size() == 0)
1138 1025
		return 0;
1139 1026

  
1140
	if(dispatcherList[sid]->tx_data.find(mt) == dispatcherList[sid]->tx_data.end() &&
1141
		dispatcherList[sid]->r_data.find(mt) == dispatcherList[sid]->r_data.end())
1027
	if(dispatcherList.find(h_dst) == dispatcherList.end())
1142 1028
		return 0;
1143 1029

  
1144 1030
	/* yes! return the space we need */
1145 1031
	return MON_DATA_HEADER_SIZE;
1146 1032
}
1033

  
1147 1034
int MeasureDispatcher::scheduleMeasure(MonHandler mh) {
1148 1035
	if(mm->isValidMonHandler(mh))
1149 1036
		mm->mMeasureInstances[mh]->Run();
1150 1037
	return EOK;
1151 1038
}
1039

  
1152 1040
int MeasureDispatcher::schedulePublish(MonHandler mh) {
1153 1041
	if(mm->isValidMonHandler(mh))
1154 1042
		if(mm->mMeasureInstances[mh]->rb != NULL)
1155 1043
			mm->mMeasureInstances[mh]->rb->publishResults();
1156 1044
	return EOK;
1157 1045
}
1158

  
monl/measure_dispatcher.h
44 44

  
45 45
void open_con_cb (int con_id, void *arg);
46 46

  
47
/* DestinationId Comparison function */
48
//TODO change it to a correct implemnetation
49
// struct socketIdCompare
50
// {
51
// 	bool operator()(const SocketId x, const SocketId y) const { 
52
// 		char *xptr, *yptr;
53
// 		xptr = (char *) x;
54
// 		yptr = (char *) y;
55
// 		if(memcmp(xptr, yptr, SOCKETID_SIZE) == 0)
56
// 			return true;
57
// 		return false;
58
// 	}
59
// };
60
struct socketIdCompare
47
struct SocketIdMt {
48
	SocketId sid;
49
	MsgType mt;
50
};
51

  
52
struct socketIdMtCompare
61 53
{
62
	bool operator()(const SocketId x, const SocketId y) const { 
63
		if(mlCompareSocketIDs(x,y) == 0)
54
	bool operator()(struct SocketIdMt x, struct SocketIdMt y) const {
55
		if(mlCompareSocketIDs(x.sid,y.sid) == 0 && x.mt == y.mt)
64 56
			return true;
65 57
		return false;
66 58
	}
67 59
};
68
/* DestinationId Hash function */
69
// struct socketIdHash
70
// {
71
// 	int operator()(const SocketId x) const {
72
// 		uint32_t hash_code = 0;
73
// 		uint32_t *intptr = (uint32_t*) x;
74
// 		for(int i=0; i < SOCKETID_SIZE / sizeof(uint32_t); i++)
75
// 			hash_code = hash_code + intptr[i];
76
// 		return hash_code;
77
// 	}
78
// };
79
struct socketIdHash
60

  
61
struct socketIdMtHash
80 62
{
81
	int operator()(const SocketId x) const {
82
		return mlHashSocketID(x);
63
	int operator()(struct SocketIdMt x) const {
64
		return mlHashSocketID(x.sid) + x.mt;
83 65
	}
84 66
};
85 67

  
86 68
typedef struct {
87
	ExecutionList el_local;
88
	ExecutionList el_remote;
89
	// elements in the execution list
90
	int32_t n_el_local;
91
	int32_t n_el_remote;
92
} DestinationMsgTypeData;
93

  
94
struct pkt_result{
95
	result r_rx_local[R_LAST_PKT];
96
	result r_rx_remote[R_LAST_PKT];
97
	result r_tx_local[R_LAST_PKT];
98
	result r_tx_remote[R_LAST_PKT];
99
	uint32_t tx_seq_num;
100
	//TODO store here destination specific data
101
};
102

  
103
struct data_result {
104
	result r_rx_local[R_LAST_DATA];
105
	result r_rx_remote[R_LAST_DATA];
106
	result r_tx_local[R_LAST_DATA];
107
	result r_tx_remote[R_LAST_DATA];
108
	uint32_t tx_seq_num;
109
	//TODO store here destination specific data
110
};
111

  
112

  
113
typedef std::tr1::unordered_map<MsgType,struct pkt_result *> ResultPktListMsgType;
114
typedef std::tr1::unordered_map<MsgType,struct data_result *> ResultDataListMsgType;
115
typedef std::tr1::unordered_map<MsgType,DestinationMsgTypeData*> DispatcherListMsgType;
116

  
117
typedef struct {
118
	DispatcherListMsgType rx_pkt;
119
	DispatcherListMsgType tx_pkt;
120
	DispatcherListMsgType rx_data;
121
	DispatcherListMsgType tx_data;
122

  
123
	ResultPktListMsgType r_pkt;
124
	ResultDataListMsgType r_data;
125

  
126
	char *sid;
69
	ExecutionList el_rx_pkt_local;
70
	ExecutionList el_rx_pkt_remote;
71
	ExecutionList el_tx_pkt_local;
72
	ExecutionList el_tx_pkt_remote;
73
	ExecutionList el_rx_data_local;
74
	ExecutionList el_rx_data_remote;
75
	ExecutionList el_tx_data_local;
76
	ExecutionList el_tx_data_remote;
77

  
78
	result pkt_r_rx_local[R_LAST_PKT];
79
	result pkt_r_rx_remote[R_LAST_PKT];
80
	result pkt_r_tx_local[R_LAST_PKT];
81
	result pkt_r_tx_remote[R_LAST_PKT];
82
	uint32_t pkt_tx_seq_num;
83

  
84
	result data_r_rx_local[R_LAST_DATA];
85
	result data_r_rx_remote[R_LAST_DATA];
86
	result data_r_tx_local[R_LAST_DATA];
87
	result data_r_tx_remote[R_LAST_DATA];
88
	uint32_t data_tx_seq_num;
89

  
90
	uint8_t sid[SOCKETID_SIZE];
91
	MsgType mt;
92
	SocketIdMt h_dst;
127 93

  
128
} DestinationSocketIdData;
94
	ExecutionList mids_local;
95
	ExecutionList mids_remote;
96
} DestinationSocketIdMtData;
129 97

  
130
typedef std::tr1::unordered_map<SocketId,DestinationSocketIdData*,socketIdHash,socketIdCompare> DispatcherListSocketId;
131 98

  
99
typedef std::tr1::unordered_map<struct SocketIdMt, DestinationSocketIdMtData*, socketIdMtHash, socketIdMtCompare> DispatcherListSocketIdMt;
132 100

  
133 101
class MeasureDispatcher {
134 102

  
135 103
	/* Execution lists for the message layer hooks */
136
	DispatcherListSocketId dispatcherList;
104
	DispatcherListSocketIdMt dispatcherList;
137 105
	
138
	int addMeasureToExecList(DispatcherListMsgType &dl, class MonMeasure *m, MsgType mt);
139
	int delMeasureFromExecList(DispatcherListMsgType &dl, class MonMeasure *m, MsgType mt);
106
	void addMeasureToExecLists(SocketIdMt h_dst, class MonMeasure *m);
107
	void delMeasureFromExecLists(MonMeasure *m);
108
	int stopMeasure(class MonMeasure *m);
109

  
110
	void createDestinationSocketIdMtData(struct SocketIdMt h_dst);
111
	void destroyDestinationSocketIdMtData(SocketId dst, MsgType mt);
140 112

  
141
	class MonMeasure* findDep(DispatcherListMsgType &dl, MeasurementCapabilities flags, MsgType mt, MeasurementId mid);
142
	class MonMeasure* findMeasureFromId(DestinationSocketIdData *dsid, MeasurementCapabilities flags, MeasurementId mid, MsgType mt);
143
	int checkDeps(DestinationSocketIdData *dsid, MeasurementCapabilities flags, MeasurementId mid, MsgType mt);
113
	class MonMeasure* findMeasureFromId(DestinationSocketIdMtData *dd, MeasurementCapabilities flags, MeasurementId mid);
144 114

  
145 115
	int sendCtrlMsg(SocketId dst, Buffer &buffer);
146 116
	int receiveCtrlMsg(SocketId sid, MsgType mt, char *cbuf, int length);
......
176 146
			initial_ttl = 0;
177 147
	};
178 148

  
179
	int activateMeasure(class MonMeasure *m, SocketId dst, MsgType mt,int autoload = 1);
149
	int activateMeasure(class MonMeasure *m, SocketId dst, MsgType mt, int autoload = 1);
180 150
	int deactivateMeasure(class MonMeasure *m);
181 151

  
182 152
	int oobDataTx(class MonMeasure *m, char *buf, int buf_len);
monl/mon_measure.cpp
30 30
	mh_remote = -1;
31 31
	ptrDispatcher = ptrDisp;
32 32

  
33
	dst_socketid = NULL;
34

  
35 33
	/* Initialise default values */
36 34
	int i;
37 35
	status = STOPPED;
......
92 90
	}
93 91

  
94 92
	if(dst_socketid != NULL) {
95
		if(mlSocketIDToString(dst_socketid, sid_string, sizeof(sid_string)) == 0) {
93
		if(mlSocketIDToString((SocketId) dst_socketid, sid_string, sizeof(sid_string)) == 0) {
96 94
			output_name += "-B:";
97 95
			output_name += sid_string;
98 96
		}
monl/mon_measure.h
61 61
	/* Bitmask defining the capabilities of this instance of the measure */
62 62
	MeasurementCapabilities flags;
63 63

  
64
	/* Parameter values */
65
	MonParameterValue *param_values;
66

  
67 64
	/* Pointer to the MeasurePlugin class describing this measure */
68 65
	class MeasurePlugin *measure_plugin;
69 66

  
......
72 69

  
73 70
	MonHandler mh_remote;
74 71

  
75
	SocketId dst_socketid;
72
	uint8_t dst_socketid[SOCKETID_SIZE];
76 73
	MsgType  msg_type;
77 74

  
78

  
79 75
	int used_counter;
80 76

  
81 77
	ResultBuffer *rb;
82 78

  
79
	/* Parameter values */
80
	MonParameterValue *param_values;
81

  
82

  
83 83
	void debugInit(const char *);
84 84
	void debugStop();
85 85
	std::fstream output_file;
......
128 128

  
129 129
	virtual ~MonMeasure() {
130 130
		delete[] param_values;
131
		if(dst_socketid)
132
			delete[] (char *) dst_socketid;
133 131
		if(rb != NULL)
134 132
			delete rb;
135 133
	};
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff