Revision 5f3adef4

View differences:

ALTOclient/ALTOclient.c
1 1
/*
2 2
 ============================================================================
3 3
 Name        : ALTOclient.c
4
 Authors     : Thilo Ewald
5
			   Armin Jahanpanah <jahanpanah@neclab.eu>
6
 
4
 Author      : T. Ewald <ewald@nw.neclab.eu>
5
 Version     : 243
7 6
 Proprietary : NEC Europe Ltd. PROPRIETARY INFORMATION
8 7
			   This software is supplied under the terms of a license
9 8
			   agreement or nondisclosure agreement with NEC Europe Ltd. and
......
29 28
			   OUT OF THE USE OF OR INABILITY TO USE THIS PROGRAM, EVEN IF NEC
30 29
			   Europe Ltd. HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.
31 30
 Modification: THIS HEADER MAY NOT BE EXTRACTED OR MODIFIED IN ANY WAY.
31
 Description : First try of the ALTO client
32 32
 ============================================================================
33 33
 */
34 34

  
35 35
#include "ALTOclient.h"
36 36
#include "ALTOclient_impl.h"
37 37

  
38
#include <stddef.h>
39 38
#include <stdarg.h>
40 39
#include <pthread.h>
41
#include <sys/time.h>
42
#include <time.h>
43

  
44
typedef struct {
45
	int query_failure_count;
46
	int query_failure_count_total;
47
} alto_stats;
48

  
49
static alto_stats stats;
50 40

  
51 41
/*
52 42
 * 		Here the reference to the accessible DBs is set
......
57 47
static xmlDocPtr ALTO_XML_req = NULL;		// Pointer to the XML for the Request
58 48
static xmlDocPtr ALTO_XML_res = NULL;		// Pointer to the XML for the Request
59 49

  
50

  
60 51
// This is the varaiable where the ALTO server can be found
61 52
static char alto_server_url[256];
62 53

  
......
92 83

  
93 84
#define returnIf(expr, msg, retval) if(expr) { alto_errorf("%s - Condition check: '"#expr"' (%s, line: %d) -- %s\n", __FUNCTION__, __FILE__, __LINE__, msg); return retval; }
94 85

  
95
#define errorMsg(msg) alto_errorf("%s (%s, line: %d) -- %s\n", __FUNCTION__, __FILE__, __LINE__, msg);
96

  
97
typedef struct {
98
	uint64_t timer_start;	/* time of init */
99
	uint64_t timer_last;	/* time of last update */
100
	uint64_t timer_delta;	/* timespan between last two updates */
101
	uint64_t ticks;			/* microsecs since timer_start */
102
	float t;				/* seconds since timer start */
103
	float dt;				/* timer_delta in seconds */
104
} alto_timer;
105

  
106
static void alto_timer_init(alto_timer* timer)
107
{
108
	struct timeval tnow;
109
	gettimeofday(&tnow, NULL);
110
	timer->timer_start = (tnow.tv_usec + tnow.tv_sec * 1000000ull);
111
	timer->timer_last = timer->timer_start;
112
	timer->timer_delta = 0;
113
	timer->ticks = 0;
114
	timer->t = 0.0f;
115
	timer->dt = 0.0f;
116
}
117

  
118
static void alto_timer_update(alto_timer* timer)
119
{
120
	struct timeval tnow;
121
	gettimeofday(&tnow, NULL);
122
	uint64_t timer_now = (tnow.tv_usec + tnow.tv_sec * 1000000ull);
123
	timer->timer_delta = timer_now - timer->timer_last;
124
	timer->timer_last = timer_now;
125
	timer->ticks = timer_now - timer->timer_start;
126
	timer->t = timer->ticks / 1000000.0f; /* in seconds */
127
	timer->dt = timer->timer_delta / 1000000.0f; /* in seconds */
128
}
129

  
130

  
131 86
/*
132 87
 * 	Function to set the actual ALTO server for configuration
133 88
 */
......
147 102
	return (char *) alto_server_url;
148 103
}
149 104

  
105

  
106

  
107

  
108

  
109

  
110

  
111

  
150 112
/*
151 113
 * 	Func:					Convert the "Baschtl" notation into a readable
152 114
 * 							format (get IP address)
......
178 140
	return IP;
179 141
}
180 142

  
143

  
144

  
145

  
181 146
/*
182 147
 * 	Func:			Convert the "Baschtl" notation into a readable format
183 148
 * 					(get prefix)
......
201 166
    return 32;
202 167
}
203 168

  
169

  
204 170
/*
205 171
 *  Func:	Create an ALTO XML request from a given DB
206 172
 *
......
209 175
 *  ret:	XML_doc		the XML where the request is stored in
210 176
 */
211 177
xmlDocPtr alto_create_request_XML(struct alto_db_t * db, struct in_addr rc_host, int pri_rat, int sec_rat){
212
	returnIf(db == NULL, "internal db ptr is NULL!", NULL);
178
	assertCheck(db, "internal db ptr is NULL!");
213 179

  
214 180
	// Creates a new document
215 181
	// <?xml version="1.0" encoding="UTF-8"?>
216 182
	xmlDocPtr doc = NULL;       /* document pointer */
217 183
	doc = xmlNewDoc(BAD_CAST "1.0");
218 184

  
219
	returnIf(doc == NULL, "xmlNewDoc failed! Out of memory?", NULL);
185
	assertCheck(doc, "xmlNewDoc failed! Out of memory?");
220 186

  
221 187
	// Create the root node and name it with the correct name space
222 188
	// <alto xmlns='urn:ietf:params:xml:ns:p2p:alto'>
......
252 218
    	xmlNewProp(node_PRI, BAD_CAST "crit", BAD_CAST "lat");
253 219
    }
254 220

  
221

  
255 222
    // Add the additional rating criteria
256 223
	if((sec_rat & REL_PREF) == REL_PREF){
257 224
		xmlNodePtr node_SEC1 = NULL;
......
269 236
		xmlNewProp(node_SEC4, BAD_CAST "crit", BAD_CAST "lat");
270 237
	}
271 238

  
239

  
272 240
    // Now create the source of the request
273 241
    // <rc_hla><ipprefix version='4' prefix='195.37.70.39/32'/></rc_hla>
274 242
    xmlNodePtr node_RC_HLA = NULL;
......
325 293
       libCURL-based POST code
326 294
  ==================================*/
327 295

  
296
// libcurl is now obsolete!
328 297
#ifdef USE_CURL
329 298

  
330 299
// this function will be registered with curl as a handler, which
331 300
// copies the http reply to a the buffer
332
static size_t curl_copy_reply_to_buf(void *ptr,size_t size,size_t nmemb,void *stream){
301
size_t curl_copy_reply_to_buf(void *ptr,size_t size,size_t nmemb,void *stream){
333 302
    size_t realsize = size * nmemb;
334 303
    struct curl_reply_buffer_t *crb = (struct curl_reply_buffer_t *)stream;
335 304
    // error: new chunk plus trailing zero would not fit into remaining buffer
......
341 310
}
342 311
uint16_t prefixes = 0;
343 312

  
313

  
314

  
344 315
xmlDocPtr query_ALTO_server_curl(xmlDocPtr doc, char* ALTO_server_URL){
345 316
	xmlDocPtr ALTO_XML_response;
346 317

  
......
351 322
	struct curl_httppost *lastptr=NULL;
352 323

  
353 324
	curl = curl_easy_init();
354
	returnIf(curl == NULL, "Couldn't get a handle from curl_easy_init(). abort.", NULL);
325
	assertCheck(curl, "Couldn't get a handle from curl_easy_init(). abort.");
355 326

  
356
	//printf("Will send HTTP POST to %s\nwith form data:\n\n%s\n", alto_server_url, ALTO_XML_query);
327
//	printf("Will send HTTP POST to %s\nwith form data:\n\n%s\n", alto_server_url, ALTO_XML_query);
357 328

  
358 329
	// prepare the buffer to be send
359 330
	xmlChar*  doctxt;
360 331
	int       doclen;
361 332
	xmlDocDumpFormatMemoryEnc(doc,&doctxt,&doclen,"utf-8",1);
362 333

  
363
	#ifdef USE_DEBUG_OUTPUT
364
	printf(stderr, "[ALTOclientCURL] Will send HTTP POST to %s\nwith XML data:\n\n%s\n", alto_server_url, doctxt);
365
	#endif
366

  
367 334
	// URL that receives this POST
368 335
	curl_easy_setopt(curl, CURLOPT_URL, ALTO_server_URL);
369
//	curl_easy_setopt(curl, CURLOPT_TCP_NODELAY, 1);
370
//	curl_easy_setopt(curl, CURLOPT_FORBID_REUSE, 1);
371
	curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_0);
372 336

  
373 337
	// add form data
374 338
	curl_formadd(&formpost,
375 339
				 &lastptr,
376 340
				 CURLFORM_COPYNAME, "alto_xml_request",
377 341
//				 CURLFORM_COPYCONTENTS, ALTO_XML_query,
378
				 CURLFORM_COPYCONTENTS, doctxt,		// PTRCONTENTS?
342
				 CURLFORM_COPYCONTENTS, doctxt,
379 343
				 CURLFORM_END);
380 344

  
381 345
	curl_formadd(&formpost,
......
400 364
	// then cleanup the form post chain
401 365
	curl_formfree(formpost);
402 366

  
403
	#ifdef USE_DEBUG_OUTPUT
404
	//printf("result of curl_easy_perform() is: %i\n", res);
405
	fprintf(stderr, "[ALTOclientCURL] received %i octets. the buffer is:\n\n%s\nthat's all. bye.\n", alto_rep_buf.fill, alto_rep_buf.buffer);
406
	#endif
367
//  printf("result of curl_easy_perform() is: %i\n", res);
368
//  printf("received %i octetts. the buffer is:\n\n%s\nthat's all. bye.\n",alto_rep_buf.fill,alto_rep_buf.buffer);
407 369

  
408 370

  
409 371
	// and last but nor least, transform it into an XML doc
410
	ALTO_XML_response = xmlRecoverMemory(alto_rep_buf.buffer, sizeof(alto_rep_buf.buffer));		// <- for getting XML from memory
372
	ALTO_XML_response = xmlRecoverMemory(alto_rep_buf.buffer,sizeof(alto_rep_buf.buffer));		// <- for getting XML from memory
411 373

  
412 374
	return ALTO_XML_response;
413 375
}
......
420 382

  
421 383
#define POST_BOUNDARY "---------------------------12408751047121013601852724877"
422 384

  
423
static void POST_add(char* buf, const char* name, const char* value) {
385
void POST_add(char* buf, const char* name, const char* value) {
424 386
	sprintf(buf+strlen(buf), "--"POST_BOUNDARY"\r\nContent-Disposition: form-data; name=\"%s\"\r\n\r\n%s\r\n", name, value);
425 387
}
426 388

  
427
static void POST_end(char* buf) {
389
void POST_end(char* buf) {
428 390
	sprintf(buf+strlen(buf), "--"POST_BOUNDARY"--\r\n\r\n");
429 391
}
430 392

  
431
static void* POST_send(const char* url, const char* data) {
393
void* POST_send(const char* url, const char* data) {
432 394
	int i;
433 395
	void* ctx = NULL;
434 396
	char header[] = "Connection: close\r\n";
435 397
	char contentType[] = "multipart/form-data; boundary="POST_BOUNDARY;
436 398
	char* ct = contentType;
437 399

  
438
	returnIf(url == NULL, "ALTO server URL is NULL!", NULL);
439
	returnIf(data == NULL, "POST data is NULL!", NULL);
400
	assertCheck(url, "ALTO server URL is NULL!");
401
	assertCheck(data, "POST data is NULL!");
440 402

  
441 403
	//ctx = xmlNanoHTTPMethod(url, "POST", data, &ct, NULL, strlen(data));
442 404
	for (i=0; i < 3; i++) {
......
449 411
		fprintf(stderr, "URL was '%s'.", url);
450 412
		return NULL;
451 413
	}
414
	assertCheck(ctx, "xmlNanoHTTPMethod failed! Make sure ALTO server is reachable (NAT issue?)..");
452 415

  
453 416
	free(ct);
454 417
	return ctx;
......
469 432
//	int		errorcode = 0;
470 433
//	FILE*	f = NULL;
471 434

  
472
	returnIf(doc == NULL, "xml doc ptr is NULL!", NULL);
473
	returnIf(endPoint == NULL, "ALTO server URL is NULL!", NULL);
435
	assertCheck(doc, "xml doc ptr is NULL!");
436
	assertCheck(endPoint, "ALTO server URL is NULL!");
474 437

  
475 438
	xmlNanoHTTPInit();
476 439
	xmlDocDumpFormatMemoryEnc(doc,&doctxt,&doclen,"utf-8",1);
477 440

  
478 441
	dataLen = doclen + 2048;
479 442
	data = malloc(dataLen);
480
	returnIf(data == NULL, "Couldn't allocate data buffer! Out of memory?", NULL);
443
	assertCheck(data, "Couldn't allocate data buffer! Out of memory?");
481 444
	memset(data, 0, dataLen);
482 445

  
483 446
	// build the mime multipart contents
......
571 534
	return result;
572 535
}
573 536

  
537

  
538

  
539

  
574 540
/*
575 541
 *
576 542
 * 		HERE is the magic for the internal DB management
......
585 551
	struct alto_db_t * db;
586 552
	db = malloc(sizeof(struct alto_db_t));
587 553
//	db = (alto_db_t *)malloc(sizeof(struct alto_db_element_t));
588
	returnIf(db == NULL, "Couldn't allocate internal db! Out of memory?", NULL);
554
	assertCheck(db, "Couldn't allocate internal db! Out of memory?");
589 555

  
590 556
	db->first = NULL;
591 557
	db->last = NULL;
......
626 592
	return 1;
627 593
}
628 594

  
595

  
596

  
629 597
/*
630 598
 * 	Helper function to print values of one ALTO DB element
631 599
 */
632 600
void alto_dump_element(struct alto_db_element_t * cur){
633 601
	// Sanity check
634
	if (cur == NULL) return;
602
	assertCheck(cur == NULL, "No element to print values from! ABORT");
635 603

  
636 604
	// normal case, print everything
637 605
//	fprintf(stdout, "---> Internal Data\t");
......
657 625
	alto_debugf("Dump what's in the DB \n");
658 626

  
659 627
	// Security Check
660
	if (!db) return;
628
	assertCheck(db, "Failure in DB structure! ABORT");
661 629

  
662 630
	// General Data
663 631
	alto_debugf("Number of elements in DB: %d \n", db->num_of_elements);
......
676 644
	return;
677 645
}
678 646

  
647

  
648

  
649

  
679 650
/*
680 651
 *  Adds one ALTO entry to the DB
681 652
 */
......
768 739
		return 1;
769 740
	}
770 741

  
742

  
771 743
	// normal handling, somwhere in the DB
772 744
	else {
773 745
		// update predecessor
......
787 759
	return -1;
788 760
}
789 761

  
762

  
790 763
struct in_addr compute_subnet(struct in_addr host, int prefix){
791 764
	struct in_addr subn;
792 765
	uint32_t match_host = host.s_addr;
......
802 775
	return subn;
803 776
}
804 777

  
778

  
779

  
805 780
/*
806 781
 * 	Search in an ALTO DB for the match of an host
807 782
 *
......
829 804
	return 0;
830 805
}
831 806

  
807

  
808

  
809

  
810

  
811

  
812

  
813

  
814

  
832 815
int ask_helper_func(struct in_addr subnet, ALTO_DB_T * db){
833 816
	ALTO_DB_ELEMENT_T * cur = db->first;
834 817
	while(cur != NULL){
......
842 825
	return 0;
843 826
}
844 827

  
828

  
829

  
845 830
int get_alto_rating(ALTO_DB_ELEMENT_T * element, ALTO_DB_T * db){
846 831

  
847 832
	int mask = element->host_mask;
......
861 846
	return 0;
862 847
}
863 848

  
849

  
850

  
864 851
int get_alto_subnet_mask(ALTO_DB_ELEMENT_T * element, ALTO_DB_T * db){
865 852
	int mask = element->host_mask;
866 853
	struct in_addr subnet = compute_subnet(element->host, mask);
......
876 863
	return 0;
877 864
}
878 865

  
866

  
867

  
868

  
869

  
870

  
871

  
872

  
879 873
/*
880 874
 * 	Here the matching between the requested IPs and the delivered
881 875
 * 	list will be done
......
906 900
	return 1;
907 901
}
908 902

  
903

  
904

  
905

  
906

  
907

  
909 908
int alto_parse_from_file(altoDbPtr db, char *file_name){
910 909
	alto_debugf("%s: Read hosts from file (%s) and store it in the Request-DB\n", __FUNCTION__, file_name);
911 910

  
......
1014 1013
	return 1;
1015 1014
}
1016 1015

  
1016

  
1017

  
1017 1018
/*
1018 1019
 * 	Converts a given alto_list_t structure into the internal DB structure
1019 1020
 *
......
1050 1051
	return 1;
1051 1052
}
1052 1053

  
1054

  
1055

  
1056

  
1057

  
1053 1058
int alto_write_to_file(altoDbPtr db, char *file_name){
1054 1059
	alto_debugf("%s: Write hosts to file (%s.out) \n", __FUNCTION__, file_name);
1055 1060

  
......
1083 1088
	return count;
1084 1089
}
1085 1090

  
1091

  
1092

  
1093

  
1086 1094
/*
1087 1095
 * 	Start & Innitialize the ALTO client
1088 1096
 */
......
1096 1104
	// prepare the XML environment
1097 1105
	LIBXML_TEST_VERSION;
1098 1106

  
1099
#ifdef USE_CURL
1100
	// prepare CURL
1101
	// ---------------------------------------------------------------------------------
1102
	// XXX: NOTE:
1103
	// "This function is NOT THREAD SAFE. You must not call it when any other thread 
1104
	// in the program (i.e. a thread sharing the same memory) is running. This doesn't 
1105
	// just mean no other thread that is using libcurl. Because curl_global_init() calls 
1106
	// functions of other libraries that are similarly thread unsafe, it could conflict 
1107
	// with any other thread that uses these other libraries."
1108
	// ---------------------------------------------------------------------------------
1109

  
1110
	curl_global_init(CURL_GLOBAL_ALL);
1111
#endif
1112

  
1113 1107
	// and Initialize the XMLs
1114 1108
    ALTO_XML_req = NULL;
1115 1109
    ALTO_XML_res = NULL;
1116 1110

  
1117
	// init query failure counters
1118
	stats.query_failure_count = 0;
1119
    stats.query_failure_count_total = 0;
1111

  
1120 1112
}
1121 1113

  
1122 1114

  
......
1135 1127
    if (ALTO_XML_res) { xmlFreeDoc(ALTO_XML_res); ALTO_XML_res = NULL; }
1136 1128

  
1137 1129
	xmlCleanupParser();
1138

  
1139
#ifdef USE_CURL
1140
	// shutdown  libCurl
1141
	// XXX: NOTE: same thread un-safety warning applies!
1142
	curl_global_cleanup();
1143
#endif
1144 1130
}
1145 1131

  
1132

  
1133

  
1134

  
1135

  
1136

  
1146 1137
/*
1147 1138
 * 	Function:	gets for a list in a txt file the correct rating
1148 1139
 *
......
1218 1209
  ==================================*/
1219 1210

  
1220 1211
static int queryState = ALTO_QUERY_READY;
1221
static alto_timer queryTimer;
1222 1212

  
1223 1213
static pthread_t threadId;
1224 1214
static pthread_attr_t attr;
......
1231 1221
	int sec_rat;
1232 1222
} ALTO_ThreadArgs_t;
1233 1223

  
1234
static ALTO_ThreadArgs_t threadArgs;
1224
ALTO_ThreadArgs_t threadArgs;
1235 1225

  
1236 1226
void* alto_query_thread_func(void* thread_args)
1237 1227
{
......
1240 1230

  
1241 1231
	alto_debugf("alto_query_thread_func\n");
1242 1232

  
1243
	// *** this will block at some point ***
1244
	do_ALTO_update(args->rc_host, args->pri_rat, args->sec_rat);	// BLOCK
1245

  
1246
	// *** at this point we got the results from the ALTO server ***
1247

  
1248
    // reset counter of consecutive connection failures
1249
    stats.query_failure_count = 0;
1233
	// this will block at some point
1234
	do_ALTO_update(args->rc_host, args->pri_rat, args->sec_rat);
1250 1235

  
1251 1236
	// write values back
1252 1237
	for(count = 0; count < args->num; count++){
......
1256 1241
	// signal that query is ready
1257 1242
	queryState = ALTO_QUERY_READY;
1258 1243

  
1259
//	alto_timer_update(&queryTimer);
1260
//	alto_debugf("Query took %.2f seconds.\n", queryTimer.t);
1261

  
1262 1244
	return thread_args;
1263 1245
}
1264 1246

  
......
1266 1248
	return queryState;
1267 1249
}
1268 1250

  
1269
int ALTO_stats(int stat_id) {
1270
	if (stat_id == ALTO_STAT_FAILURE_COUNT) return stats.query_failure_count;
1271
	else if (stat_id == ALTO_STAT_FAILURE_COUNT_TOTAL) return  stats.query_failure_count_total;
1272
	return 0;
1273
}
1274

  
1275 1251
int ALTO_query_exec(ALTO_GUIDANCE_T * list, int num, struct in_addr rc_host, int pri_rat, int sec_rat){
1252
	alto_debugf("ALTO_query_exec\n");
1253

  
1254
	// Sanity checks (list)
1276 1255
	returnIf(list == NULL, "Can't access the list!", 0);
1277
	returnIf(num < 0, "<0 elements?", 0);
1278 1256

  
1279
	int res = ALTO_QUERY_EXEC_OK;
1257
	// Sanity checks (num of elements)
1258
	returnIf(num < 0, "<0 elements?", 0);
1280 1259

  
1281 1260
	// set new state
1282 1261
	if (queryState == ALTO_QUERY_INPROGRESS) {
1283
		alto_timer_update(&queryTimer);
1284
		if (queryTimer.t < ALTO_TIMEOUT) {
1285
			alto_debugf("*** WARNING: Calling ALTO_query_exec while query is still in progress! Aborting..\n");
1286
			return ALTO_QUERY_EXEC_INPROGRESS;
1287
		}
1288
		alto_debugf("*** NOTE: Previous ALTO_query_exec timed out (> %d sec.), starting new query..\n", ALTO_TIMEOUT);
1289

  
1290
		res = ALTO_QUERY_EXEC_TIMEOUT;
1291

  
1292
		// count connection failures
1293
		stats.query_failure_count++;
1294
		stats.query_failure_count_total++;
1295
		alto_debugf("total count of ALTO server query connection failures so far: %d\n", stats.query_failure_count_total);
1262
		alto_debugf("*** WARNING: Calling ALTO_query_exec while query is still in progress! Race condition?!\n");
1263
		return 0;
1296 1264
	}
1297 1265
	queryState = ALTO_QUERY_INPROGRESS;
1298
	alto_timer_init(&queryTimer);
1299 1266

  
1300 1267
	// first purge existing DB entries
1301 1268
	alto_purge_db(ALTO_DB_req);
......
1305 1272

  
1306 1273
	//ALTO_XML_req = alto_create_request_XML(ALTO_DB_req, rc_host, pri_rat, sec_rat);
1307 1274

  
1308
	// *** start async query thread ***
1309 1275
	threadArgs.list = list;
1310 1276
	threadArgs.num = num;
1311 1277
	pthread_attr_init(&attr);
......
1313 1279
	if (pthread_create(&threadId, &attr, alto_query_thread_func, &threadArgs) != 0) {
1314 1280
		fprintf(stderr,"[ALTOclient] pthread_create failed!\n");
1315 1281
		queryState = ALTO_QUERY_READY;
1316
		return ALTO_QUERY_EXEC_THREAD_FAIL;
1282
		return 0;
1317 1283
	}
1318 1284

  
1319 1285
	// This should be it
1320
	return res;
1286
	return 1;
1321 1287
}
1322 1288

  
1289

  
1290

  
1323 1291
/*
1324 1292
 * Function:	With this call the internal request to update the DB is triggered.
1325 1293
 * 				This should be done on a regual basis to keep the local ALTO-DB
1326 1294
 * 				up2date
1327 1295
 */
1296

  
1297

  
1328 1298
void do_ALTO_update(struct in_addr rc_host, int pri_rat, int sec_rat){
1329
	if (!ALTO_DB_req) {
1330
		errorMsg("ALTO_DB_req is NULL!");
1331
		return;
1332
	}
1299
	assertCheck(ALTO_DB_req, "ALTO_DB_req is NULL!");
1333 1300

  
1334 1301
	// Step 2: create an XML from the DB entries
1335 1302
	ALTO_XML_req = alto_create_request_XML(ALTO_DB_req, rc_host, pri_rat, sec_rat);
1336
	if (!ALTO_XML_req) {
1337
		errorMsg("alto_create_request_XML failed!");
1338
		return;
1339
	}
1340 1303

  
1341 1304
#ifndef USE_LOCAL_REPLY_XML
1342 1305
	// Step2a: send POST request to ALTO server
......
1360 1323

  
1361 1324
		xmlFreeDoc(ALTO_XML_res);
1362 1325
		ALTO_XML_res = NULL;
1363
	} else {
1364
		alto_debugf("** WARNING: Invalid response from ALTO server! (%s)\n", __FUNCTION__);
1365 1326
	}
1366 1327

  
1367 1328
	// free xml data
ALTOclient/ALTOclient.h
42 42
#define ALTO_QUERY_READY		0
43 43
#define ALTO_QUERY_INPROGRESS	1
44 44

  
45
/** Maximum server response time in seconds */ 
46
#define ALTO_TIMEOUT	20
47

  
48
/**
49
 * Return codes of ALTO_query_exec.
50
 * ALTO_QUERY_EXEC_TIMEOUT means the last query timed out, but the new query started fine. 
51
 */
52
#define ALTO_QUERY_EXEC_OK			0
53
#define ALTO_QUERY_EXEC_INPROGRESS	1
54
#define ALTO_QUERY_EXEC_THREAD_FAIL	2
55
#define ALTO_QUERY_EXEC_TIMEOUT		3
56

  
57
/** statistics IDs for ALTO_stats */
58
#define ALTO_STAT_FAILURE_COUNT			0
59
#define ALTO_STAT_FAILURE_COUNT_TOTAL	1
60

  
61 45
/**
62 46
 * This is the struct of one element for the internal interface. Make lists out of it to interact with the client.
63 47
 */
......
158 142
int get_ALTO_guidance_for_list(ALTO_GUIDANCE_T * list, int num, struct in_addr rc_host, int pri_rat, int sec_rat);
159 143

  
160 144
/**
161
 *	Asynchronous/threaded ALTO query. Return codes are ALTO_QUERY_EXEC_* (see defines above).
162
 *  ALTO_QUERY_EXEC_TIMEOUT means last query timed out, but the new query started fine.
145
 *	Asynchronous/threaded ALTO query.
163 146
 *	@see get_ALTO_guidance_for_list
164 147
 */
165 148
int ALTO_query_exec(ALTO_GUIDANCE_T * list, int num, struct in_addr rc_host, int pri_rat, int sec_rat);
......
170 153
 */
171 154
int ALTO_query_state();
172 155

  
173
/**
174
 *  Returns statistics value for the given ALTO_STAT_* ID. 
175
 *  @return requested statistics value.
176
 */
177
int ALTO_stats(int stats_id);
178

  
179 156
#endif /* ALTOCLIENT_H */
ALTOclient/Makefile
5 5
LDFLAGS += -L$(XMLLIB)
6 6
LDLIBS += -lxml2 -lpthread
7 7

  
8
ifdef CURL
9
LDLIBS += -lcurl
10
CPPFLAGS += -DUSE_CURL
11
endif
8
# libcurl is now obsolete
9
#LDLIBS += -lcurl
12 10

  
13 11
all: main lib
14 12

  
Makefile.am
1 1
AUTOMAKE_OPTIONS = foreign no-dist no-installinfo
2 2
ACLOCAL_AMFLAGS = -I m4
3
SUBDIRS = . common dclog rep monl  
3
SUBDIRS = . common dclog ml rep monl tests ALTOclient
4 4

  
5 5
#include aminclude.am
6 6

  
configure.ac
12 12
AC_PROG_RANLIB
13 13
AC_LANG_C
14 14

  
15
AC_CHECK_HEADER(netdb.h)
16
if test "$ac_cv_header_netdb_h" == no
17
then
18
	AC_MSG_ERROR([netdb.h is missing, unable to continue])
19
	exit -1
20
fi
15 21

  
16 22
if test "$ac_cv_header_sys_types_h" == no
17 23
then
dclog/dclog.c
44 44
#include <sys/stat.h>
45 45
#include <errno.h>
46 46
#include <fcntl.h>
47
//#include <netdb.h>
47
#include <netdb.h>
48 48
#include <stdarg.h>
49 49
#include <stdio.h>
50 50
#include <stdlib.h>
......
54 54
#ifndef WIN32
55 55
#include <sys/socket.h>
56 56
#include <netinet/in.h>
57

  
58
#else
59

  
60
static struct tm *localtime_r(const time_t *clock, struct tm *result) {
61
    struct tm *res = localtime(clock);
62
    *result = *res;
63
  return result;
64
}
65 57
#endif
66 58

  
67 59
// dclog headers
......
761 753
  
762 754
  // If the user-specified level is higher than the logging level for
763 755
  // this DCLog object, return success
764
  if (lev > dclog->lev) {
756
  if (lev > dclog->lev)
765 757
    return 1;
766
  }
767 758
  
768 759
  // If we are using the unique filename by day of the month feature,
769 760
  // DCLOG_FEAT_UNIQUE, see if the day of the month has changed
dclog/log.c
38 38
        DCLogSetHeader( dclog, 1 );
39 39
        DCLogSetPrintLevel( dclog, 1 );
40 40

  
41
#if !WIN32 && !MAC_OS
42 41
	logstream = open_memstream(&logbuffer, &logbuffer_size);
43 42
	if (!logstream) {
44 43
		fprintf(stderr, "Unable to initialize logger, exiting");
45 44
		exit(-1);
46 45
	}
47
#else
48
        logbuffer_size=1000;
49
	logbuffer = (char *) malloc(logbuffer_size);
50
#endif
51 46
        initialized = 1;
52 47
}
53 48

  
......
65 60

  
66 61
	if (!initialized) return;
67 62

  
63
	rewind(logstream);
68 64

  
69 65
  	va_start( str_args, fmt );
70
#if !WIN32 && !MAC_OS
71
	rewind(logstream);
72 66
  	if (vfprintf( logstream, fmt, str_args ) < 0) return;
67
  	va_end( str_args );
68

  
73 69
	char zero = 0;
74 70
	if (fwrite(&zero, 1, 1, logstream) != 1) return;
75 71
	fflush(logstream);
76
#else
77
        if (vsnprintf( logbuffer, logbuffer_size, fmt, str_args) < 0) return;
78
        logbuffer[logbuffer_size - 1] = '\0';
79
#endif
80
  	va_end( str_args );
81 72

  
82
//fprintf(stderr, "X.do logger lev:%d, msg %s\n", lev, logbuffer);
83 73
	DCLogWrite(dclog, lev , logbuffer);
84
        fflush(dclog->fp);
85 74
}
86 75

  
include/ALTOclient.h
42 42
#define ALTO_QUERY_READY		0
43 43
#define ALTO_QUERY_INPROGRESS	1
44 44

  
45
/** Maximum server response time in seconds */ 
46
#define ALTO_TIMEOUT	20
47

  
48
/**
49
 * Return codes of ALTO_query_exec.
50
 * ALTO_QUERY_EXEC_TIMEOUT means the last query timed out, but the new query started fine. 
51
 */
52
#define ALTO_QUERY_EXEC_OK			0
53
#define ALTO_QUERY_EXEC_INPROGRESS	1
54
#define ALTO_QUERY_EXEC_THREAD_FAIL	2
55
#define ALTO_QUERY_EXEC_TIMEOUT		3
56

  
57
/** statistics IDs for ALTO_stats */
58
#define ALTO_STAT_FAILURE_COUNT			0
59
#define ALTO_STAT_FAILURE_COUNT_TOTAL	1
60

  
61 45
/**
62 46
 * This is the struct of one element for the internal interface. Make lists out of it to interact with the client.
63 47
 */
......
158 142
int get_ALTO_guidance_for_list(ALTO_GUIDANCE_T * list, int num, struct in_addr rc_host, int pri_rat, int sec_rat);
159 143

  
160 144
/**
161
 *	Asynchronous/threaded ALTO query. Return codes are ALTO_QUERY_EXEC_* (see defines above).
162
 *  ALTO_QUERY_EXEC_TIMEOUT means last query timed out, but the new query started fine.
145
 *	Asynchronous/threaded ALTO query.
163 146
 *	@see get_ALTO_guidance_for_list
164 147
 */
165 148
int ALTO_query_exec(ALTO_GUIDANCE_T * list, int num, struct in_addr rc_host, int pri_rat, int sec_rat);
......
170 153
 */
171 154
int ALTO_query_state();
172 155

  
173
/**
174
 *  Returns statistics value for the given ALTO_STAT_* ID. 
175
 *  @return requested statistics value.
176
 */
177
int ALTO_stats(int stats_id);
178

  
179 156
#endif /* ALTOCLIENT_H */
include/grapes.h
62 62
#include <sys/types.h>
63 63
#include <stdbool.h>
64 64
#include <stdlib.h>
65
#ifdef WIN32
66
#include <winsock2.h>
67
#endif
68 65

  
69 66
#ifndef byte
70 67
/** 8-bit unsigned type */
include/mon.h
273 273
/* a remote error occured while activating the measure */
274 274
RFAILED,
275 275
/* you called monActivateMeasure with wrong paramters */
276
MS_ERROR,
276
ERROR,
277 277
/* the measure is beeing set up */
278 278
INITIALISING,
279 279
/* the measure is beeing stopped */
ml/Makefile.am
5 5
INCLUDES = -I$(top_srcdir)/include/  #-I$(top_srcdir)/dclog
6 6
noinst_LIBRARIES = libml.a
7 7

  
8
libml_a_SOURCES = BUGS.txt ml.c ml_log.c util/stun.c \
9
	util/udpSocket.c util/rateLimiter.c util/queueManagement.c
8
libml_a_SOURCES = BUGS.txt ml.c util/stun.c \
9
	util/udpSocket.c util/rateControl.c
10 10
# transmissionHandler.c
11 11

  
12 12
# testMessaginglayer: echoServer.c ml.h ml.c transmissionHandler.h transmissionHandler.c util/stun.h util/stun.c util/udpSocket.h util/udpSocket.c
13
#        ${COMPILER} -o echoServer.o echoServer.c ml.h ml.c transmissionHandler.h transmissionHandler.c util/stun.h util/stun.c util/udpSocket.h util/udpSocket.c ${EVENT} ${MATH} ${GDB}
13
#        ${COMPILER} -o echoServer.o echoServer.c ml.h ml.c transmissionHandler.h transmissionHandler.c util/stun.h util/stun.c util/udpSocket.h util/udpSocket.c util/rateControl.c ${EVENT} ${MATH} ${GDB}
14 14

  
15 15
#libml_a_LIBADD = $(top_builddir)/dclog/libdclog.a
ml/include/ml.h
51 51
#include <stdint.h>
52 52
#include <sys/time.h>
53 53

  
54

  
54 55
/**
55 56
 * @brief The size of a socketID
56 57
 */
......
261 262
int mlInit(bool recv_data_cb,struct timeval timeout_value,const int port,const char *ipaddr,const int stun_port,const char *stun_ipaddr,receive_localsocketID_cb local_socketID_cb,void *arg);
262 263

  
263 264
/**
264
  * Configure the verbosity of messages
265
  * @param log_level [0-4] the lower the less messages are printed out
265
  * Configure the parameters for output rate control.
266
  * These values may also be set while packets are being transmitted.
267
  * @param bucketsize The size of the bucket in kbytes
268
  * @param drainrate The amount of kbytes draining in a second. If drainrate is <=0, then rateControl is completely disabled (all packets are passed).
266 269
*/
267
void mlSetVerbosity(int log_level);
270
void mlSetThrottle(int bucketsize, int drainrate);
268 271

  
269 272
/**
270 273
 * @brief Register a received packet callback.
......
514 517
 */
515 518
int mlGetPathMTU(int ConnectionId);
516 519

  
517

  
518

  
519
/**
520
  * Configure the parameters for output rate control.
521
  * @param bucketsize The size of the bucket in kbytes
522
  * @param drainrate The amount of kbytes draining in a second. If drainrate is <=0, then rateControl is completely disabled (all packets are passed).
523
  * @param maxQueueSize In kbytes. Max data stored while limiting the output rate. If 0 packets limitted by drainrate are dropped.
524
  * @param maxQueueSizeRTX In kbytes. Max data waiting for the retransmission if needed.
525
  * @param maxTimeToHold. Time for which sent packets are stored in RTX queue in seconds.
526
*/
527
void mlSetRateLimiterParams(int bucketsize, int drainrate, int maxQueueSize, int maxQueueSizeRTX, double maxTimeToHold);
528

  
529

  
530 520
#ifdef __cplusplus
531 521
}
532 522
#endif
ml/ml.c
32 32
 *     THIS HEADER MAY NOT BE EXTRACTED OR MODIFIED IN ANY WAY.
33 33
 */
34 34

  
35
#include <arpa/inet.h>
36
#ifndef WIN32
37
#include <netinet/in.h>
38
#include <sys/socket.h>
39
#endif
40
#include <fcntl.h>
41
#include <event2/event.h>
35 42
#include <stdlib.h>
36 43
#include <unistd.h>
37 44
#include <stdio.h>
......
39 46
#include <stdint.h>
40 47
#include <string.h>
41 48
#include <sys/types.h>
49
#include <arpa/inet.h>
50
#include <netdb.h>
51
#include <errno.h>
42 52
#include <time.h>
43 53
#include <math.h>
44 54
#include <assert.h>
45
#include <errno.h>
46

  
47
#ifndef WIN32
48
#include <arpa/inet.h>
49
#include <netdb.h>
50
#include <netinet/in.h>
51
#include <sys/socket.h>
52
#include <fcntl.h>
53
#else
54

  
55
#include <winsock2.h>
56
#include <ws2tcpip.h>
57
#endif
58 55

  
59 56
#include "util/udpSocket.h"
60 57
#include "util/stun.h"
61 58
#include "transmissionHandler.h"
62
#include "util/rateLimiter.h"
63
#include "util/queueManagement.h"
64 59

  
65 60
#define LOG_MODULE "[ml] "
66 61
#include "ml_log.h"
......
112 107
 */
113 108
#define RECV_TIMEOUT_DEFAULT { 2, 0 }
114 109

  
115
#ifdef RTX
116
/*
117
 * default timeout value for a packet reception
118
 */
119
#define PKT_RECV_TIMEOUT_DEFAULT { 0, 50000 } // 50 ms
120

  
121
/*
122
 * default timeout value for a packet reception
123
 */
124
#define LAST_PKT_RECV_TIMEOUT_DEFAULT { 1, 700000 }
125

  
126
/*
127
 * default fraction of RECV_TIMEOUT_DEFAULT for a last packet(s) reception timeout
128
 */
129
#define LAST_PKT_RECV_TIMEOUT_FRACTION 0.7
130

  
131
#endif
132

  
133

  
134 110
/*
135 111
 * global variables
136 112
 */
137

  
138 113
/*
139 114
 * define a buffer of pointers to connect structures
140 115
 */
......
208 183
/*
209 184
 * helper function to get rid of a warning
210 185
 */
211
#ifndef WIN32
212 186
int min(int a, int b) {
213 187
	if (a > b) return b;
214 188
	return a;
215 189
}
216
#endif
217

  
218
#ifdef RTX
219
//*********Counters**********
220

  
221
struct Counters {
222
	unsigned int receivedCompleteMsgCounter;
223
	unsigned int receivedIncompleteMsgCounter;
224
	unsigned int receivedDataPktCounter;
225
	unsigned int receivedRTXDataPktCounter;
226
	unsigned int receivedNACK1PktCounter;
227
	unsigned int receivedNACKMorePktCounter;
228
	unsigned int sentDataPktCounter;
229
	unsigned int sentRTXDataPktCtr;
230
	unsigned int sentNACK1PktCounter;
231
	unsigned int sentNACKMorePktCounter;
232
} counters;
233

  
234
extern unsigned int sentRTXDataPktCounter;
235

  
236
/*
237
 * receive timeout for a packet
238
 */
239
static struct timeval pkt_recv_timeout = PKT_RECV_TIMEOUT_DEFAULT;
240

  
241

  
242
static struct timeval last_pkt_recv_timeout = LAST_PKT_RECV_TIMEOUT_DEFAULT;
243

  
244
void mlShowCounters() {
245
	counters.sentRTXDataPktCtr = sentRTXDataPktCounter;
246
	fprintf(stderr, "\nreceivedCompleteMsgCounter: %d\nreceivedIncompleteMsgCounter: %d\nreceivedDataPktCounter: %d\nreceivedRTXDataPktCounter: %d\nreceivedNACK1PktCounter: %d\nreceivedNACKMorePktCounter: %d\nsentDataPktCounter: %d\nsentRTXDataPktCtr: %d\nsentNACK1PktCounter: %d\nsentNACKMorePktCounter: %d\n", counters.receivedCompleteMsgCounter, counters.receivedIncompleteMsgCounter, counters.receivedDataPktCounter, counters.receivedRTXDataPktCounter, counters.receivedNACK1PktCounter, counters.receivedNACKMorePktCounter, counters.sentDataPktCounter, counters.sentRTXDataPktCtr, counters.sentNACK1PktCounter, counters.sentNACKMorePktCounter);
247
	return;
248
}
249

  
250
void recv_nack_msg(struct msg_header *msg_h, char *msgbuf, int msg_size)
251
{
252
	struct nack_msg *nackmsg;
253
	
254
	msgbuf += msg_h->len_mon_data_hdr;
255
	msg_size -= msg_h->len_mon_data_hdr;
256
	nackmsg = (struct nack_msg*) msgbuf;
257
	
258
	unsigned int gapSize = nackmsg->offsetTo - nackmsg->offsetFrom;
259
	//if (gapSize == 1349) counters.receivedNACK1PktCounter++;
260
	//else counters.receivedNACKMorePktCounter++;
261

  
262
	rtxPacketsFromTo(nackmsg->con_id, nackmsg->msg_seq_num, nackmsg->offsetFrom, nackmsg->offsetTo);	
263
}
264

  
265
void pkt_recv_timeout_cb(int fd, short event, void *arg){
266
	int recv_id = (long) arg;
267
	debug("ML: recv_timeout_cb called. Timeout for id:%d\n",recv_id);
268

  
269
	//check if message still exists	
270
	if (recvdatabuf[recv_id] == NULL) return;
271

  
272
	//check if gap was filled in the meantime
273
	if (recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetFrom == recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetTo) {
274
		recvdatabuf[recv_id]->firstGap++;
275
		return;	
276
	}
277

  
278
	struct nack_msg nackmsg;
279
	nackmsg.con_id = recvdatabuf[recv_id]->txConnectionID;
280
	nackmsg.msg_seq_num = recvdatabuf[recv_id]->seqnr;
281
	nackmsg.offsetFrom = recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetFrom;
282
	nackmsg.offsetTo = recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetTo;
283
	recvdatabuf[recv_id]->firstGap++;
284

  
285
	unsigned int gapSize = nackmsg.offsetTo - nackmsg.offsetFrom;
286
	//if (gapSize == 1349) counters.sentNACK1PktCounter++;
287
	//else counters.sentNACKMorePktCounter++;
288

  
289
	//fprintf(stderr,"Sending NACK <- msg_seq_num: %d. monitoringDataHeaderLen: %d. offsetFrom: %d offsetTo: %d \n", nackmsg.msg_seq_num, recvdatabuf[recv_id]->monitoringDataHeaderLen, nackmsg.offsetFrom, nackmsg.offsetTo);
290

  
291
	send_msg(recvdatabuf[recv_id]->connectionID, ML_NACK_MSG, (char *) &nackmsg, sizeof(struct nack_msg), true, &(connectbuf[recvdatabuf[recv_id]->connectionID]->defaultSendParams));	
292
}
293

  
294
void last_pkt_recv_timeout_cb(int fd, short event, void *arg){
295
	int recv_id = (long) arg;
296
	debug("ML: recv_timeout_cb called. Timeout for id:%d\n",recv_id);
297

  
298
	if (recvdatabuf[recv_id] == NULL) {
299
		//fprintf(stderr,"Called last_pkt_recv_timeout_cb but there is no slot\n");
300
		return;
301
	}
302

  
303
	//fprintf(stderr,"Starting last_pkt_recv_timeout_cb for msg_seq_num: %d\n", recvdatabuf[recv_id]->seqnr);
304

  
305
	if (recvdatabuf[recv_id]->expectedOffset == recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen) return;
306

  
307
	struct nack_msg nackmsg;
308
	nackmsg.con_id = recvdatabuf[recv_id]->txConnectionID;
309
	nackmsg.msg_seq_num = recvdatabuf[recv_id]->seqnr;
310
	nackmsg.offsetFrom = recvdatabuf[recv_id]->expectedOffset;
311
	nackmsg.offsetTo = recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen;
312

  
313
	unsigned int gapSize = nackmsg.offsetTo - nackmsg.offsetFrom;
314
	//if (gapSize == 1349) counters.sentNACK1PktCounter++;
315
	//else counters.sentNACKMorePktCounter++;	
316

  
317
	//fprintf(stderr,"last_pkt - Sending NACK <- msg_seq_num: %d. monitoringDataHeaderLen: %d. offsetFrom: %d offsetTo: %d \n", nackmsg.msg_seq_num, recvdatabuf[recv_id]->monitoringDataHeaderLen, nackmsg.offsetFrom, nackmsg.offsetTo);
318

  
319
	send_msg(recvdatabuf[recv_id]->connectionID, ML_NACK_MSG, &nackmsg, sizeof(struct nack_msg), true, &(connectbuf[recvdatabuf[recv_id]->connectionID]->defaultSendParams));	
320
}
321

  
322
#endif
323

  
324

  
325 190

  
326 191
/*
327 192
 * convert a socketID to a string. It uses a static buffer, so either strdup is needed, or the string will get lost!
......
437 302
			msg_h.offset = htonl(offset);
438 303
			msg_h.msg_length = htonl(truncable ? pkt_len : msg_len);
439 304

  
305
			//monitoring layer hook
306
			if(get_Send_pkt_inf_cb != NULL && iov[1].iov_len) {
307
				mon_pkt_inf pkt_info;
308

  
309
				memset(h_pkt,0,MON_PKT_HEADER_SPACE);
310

  
311
				pkt_info.remote_socketID = &(connectbuf[con_id]->external_socketID);
312
				pkt_info.buffer = msg + offset;
313
				pkt_info.bufSize = pkt_len;
314
				pkt_info.msgtype = msg_type;
315
				pkt_info.dataID = connectbuf[con_id]->seqnr;
316
				pkt_info.offset = offset;
317
				pkt_info.datasize = msg_len;
318
				pkt_info.monitoringHeaderLen = iov[1].iov_len;
319
				pkt_info.monitoringHeader = iov[1].iov_base;
320
				pkt_info.ttl = -1;
321
				memset(&(pkt_info.arrival_time),0,sizeof(struct timeval));
322

  
323
				(get_Send_pkt_inf_cb) ((void *) &pkt_info);
324
			}
440 325

  
441 326
			debug("ML: sending packet to %s with rconID:%d lconID:%d\n", conid_to_string(con_id), ntohl(msg_h.remote_con_id), ntohl(msg_h.local_con_id));
442
			int priority = 0; 
443
			if ((msg_type == ML_CON_MSG)
444
#ifdef RTX
445
 || (msg_type == ML_NACK_MSG)
446
#endif
447
) priority = HP;
448
			//fprintf(stderr,"*******************************ML.C: Sending packet: msg_h.offset: %d msg_h.msg_seq_num: %d\n",ntohl(msg_h.offset),ntohl(msg_h.msg_seq_num));
449
			switch(queueOrSendPacket(socketfd, iov, 4, &udpgen.udpaddr,priority)) {
327
			switch(sendPacket(socketfd, iov, 4, &udpgen.udpaddr)) {
450 328
				case MSGLEN:
451 329
					info("ML: sending message failed, reducing MTU from %d to %d (to:%s conID:%d lconID:%d msgsize:%d offset:%d)\n", connectbuf[con_id]->pmtusize, pmtu_decrement(connectbuf[con_id]->pmtusize), conid_to_string(con_id), ntohl(msg_h.remote_con_id), ntohl(msg_h.local_con_id), msg_len, offset);
452 330
					// TODO: pmtu decremented here, but not in the "truncable" packet. That is currently resent without changing the claimed pmtu. Might need to be changed.
......
466 344
					offset = msg_len; // exit the while
467 345
					break;
468 346
				case OK:
469
#ifdef RTX
470
					if (msg_type < 127) counters.sentDataPktCounter++;
471
#endif
472 347
					//update
473 348
					offset += pkt_len;
474 349
					//transmit data header only in the first packet
......
477 352
			}
478 353
		} while(offset != msg_len && !truncable);
479 354
	} while(retry);
480
	//fprintf(stderr, "sentDataPktCounter after msg_seq_num = %d: %d\n", msg_h.msg_seq_num, counters.sentDataPktCounter);
481
	//fprintf(stderr, "sentRTXDataPktCounter after msg_seq_num = %d: %d\n", msg_h.msg_seq_num, counters.sentRTXDataPktCtr);
482 355
}
483 356

  
484 357
void pmtu_timeout_cb(int fd, short event, void *arg);
485 358

  
486
int sendPacket(const int udpSocket, struct iovec *iov, int len, struct sockaddr_in *socketaddr) {
487
	//monitoring layer hook
488
	if(get_Send_pkt_inf_cb != NULL && iov[1].iov_len) {
489
		mon_pkt_inf pkt_info;	
490

  
491
		struct msg_header *msg_h  = (struct msg_header *) iov[0].iov_base;
492

  
493
		memset(iov[1].iov_base,0,iov[1].iov_len);
494

  
495
		pkt_info.remote_socketID = &(connectbuf[ntohl(msg_h->local_con_id)]->external_socketID);
496
		pkt_info.buffer = iov[3].iov_base;
497
		pkt_info.bufSize = iov[3].iov_len;
498
		pkt_info.msgtype = msg_h->msg_type;
499
		pkt_info.dataID = ntohl(msg_h->msg_seq_num);
500
		pkt_info.offset = ntohl(msg_h->offset);
501
		pkt_info.datasize = ntohl(msg_h->msg_length);
502
		pkt_info.monitoringHeaderLen = iov[1].iov_len;
503
		pkt_info.monitoringHeader = iov[1].iov_base;
504
		pkt_info.ttl = -1;
505
		memset(&(pkt_info.arrival_time),0,sizeof(struct timeval));
506

  
507
		(get_Send_pkt_inf_cb) ((void *) &pkt_info);
508
	}
509

  
510
 	//struct msg_header *msg_h;
511
    //msg_h = (struct msg_header *) iov[0].iov_base;        
512

  
513
	//fprintf(stderr,"*** Sending packet - msgSeqNum: %d offset: %d\n",ntohl(msg_h->msg_seq_num),ntohl(msg_h->offset));
514

  
515
	return sendPacketFinal(udpSocket, iov, len, socketaddr);
516
}
517

  
518 359
void reschedule_conn_msg(int con_id)
519 360
{
520 361
	if (connectbuf[con_id]->timeout_event) {
......
549 390
	msg_header->pmtu_size = connectbuf[con_id]->pmtusize;
550 391

  
551 392
	memcpy(&(msg_header->sock_id), loc_socketID, sizeof(socket_ID));
552
  {
553
                        char buf[SOCKETID_STRING_SIZE];
554
                        mlSocketIDToString(&((struct conn_msg*)connectbuf[con_id]->ctrl_msg_buf)->sock_id,buf,sizeof(buf));
555
                        debug("Local socket_address sent in INVITE: %s, sizeof msg %ld\n", buf, sizeof(struct conn_msg));
556
   }
393

  
557 394
	send_msg(con_id, ML_CON_MSG, connectbuf[con_id]->ctrl_msg_buf, buf_size, true, &(connectbuf[con_id]->defaultSendParams));
558 395
}
559 396

  
......
667 504
				connectbuf[free_con_id]->connection_head = connectbuf[free_con_id]->connection_last = NULL;
668 505
				connectbuf[free_con_id]->starttime = time(NULL);
669 506
				memcpy(&(connectbuf[free_con_id]->external_socketID), &(con_msg->sock_id), sizeof(socket_ID));
670
		//Workaround to support reuse of socketID
671
				connectbuf[free_con_id]->external_socketID.internal_addr.udpaddr.sin_family=AF_INET;
672
				connectbuf[free_con_id]->external_socketID.external_addr.udpaddr.sin_family=AF_INET;
673 507
				connectbuf[free_con_id]->pmtusize = con_msg->pmtu_size;	// bootstrap pmtu from the other's size. Not strictly needed, but a good hint
674 508
				connectbuf[free_con_id]->timeout_event = NULL;
675 509
				connectbuf[free_con_id]->external_connectionID = msg_h->local_con_id;
......
791 625
		NAT_traversal = true;
792 626
		// callback to the upper layer indicating that the socketID is now
793 627
		// ready to use
794
		{
795
                	char buf[SOCKETID_STRING_SIZE];
796
                	mlSocketIDToString(&local_socketID,buf,sizeof(buf));
797
 			debug("received local socket_address: %s\n", buf);
798
		}
799 628
		(receive_SocketID_cb) (&local_socketID, 0);
800 629
	}
801 630
}
......
860 689
			&(connectbuf[recvdatabuf[recv_id]->connectionID]->external_socketID);
861 690
		rParams.firstPacketArrived = recvdatabuf[recv_id]->firstPacketArrived;
862 691

  
863
#ifdef RTX
864
		counters.receivedIncompleteMsgCounter++;
865
		//mlShowCounters();
866
		//fprintf(stderr,"******Cleaning slot for inclomplete msg_seq_num: %d\n", recvdatabuf[recv_id]->seqnr);		
867
#endif
868
 		//(receive_data_callback) (recvdatabuf[recv_id]->recvbuf + recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->msgtype, &rParams);
692
// 		(receive_data_callback) (recvdatabuf[recv_id]->recvbuf + recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen,
693
// 			recvdatabuf[recv_id]->msgtype, &rParams);
869 694

  
870 695
		//clean up
871 696
		if (recvdatabuf[recv_id]->timeout_event) {
......
891 716
		return;
892 717
	}
893 718

  
894
#ifdef RTX
895
	counters.receivedDataPktCounter++;
896
#endif	
897 719
	// check if a recv_data exist and enter data
898 720
	for (recv_id = 0; recv_id < RECVDATABUFSIZE; recv_id++)
899 721
		if (recvdatabuf[recv_id] != NULL) {
......
916 738
		recvdatabuf[recv_id]->bufsize = msg_h->msg_length + msg_h->len_mon_data_hdr;
917 739
		recvdatabuf[recv_id]->recvbuf = (char *) malloc(recvdatabuf[recv_id]->bufsize);
918 740
		recvdatabuf[recv_id]->arrivedBytes = 0;	//count this without the Mon headers
919
#ifdef RTX
920
		recvdatabuf[recv_id]->txConnectionID = msg_h->local_con_id;
921
		recvdatabuf[recv_id]->expectedOffset = 0;
922
		recvdatabuf[recv_id]->gapCounter = 0;
923
		recvdatabuf[recv_id]->firstGap = 0;
924
		recvdatabuf[recv_id]->last_pkt_timeout_event = NULL;
925
#endif
926

  
927 741
		/*
928 742
		* read the timeout data and set it
929 743
		*/
......
942 756

  
943 757
	//if first packet extract mon data header and advance pointer
944 758
	if (msg_h->offset == 0) {
945
		//fprintf(stderr,"Hoooooray!! We have first packet of some message!!\n");
946 759
		memcpy(recvdatabuf[recv_id]->recvbuf, msgbuf, msg_h->len_mon_data_hdr);
947 760
		msgbuf += msg_h->len_mon_data_hdr;
948 761
		bufsize -= msg_h->len_mon_data_hdr;
......
953 766
	// increment fragmentnr
954 767
	recvdatabuf[recv_id]->recvFragments++;
955 768
	// increment the arrivedBytes
956
	recvdatabuf[recv_id]->arrivedBytes += bufsize; 
957

  
958
	//fprintf(stderr,"Arrived bytes: %d Offset: %d Expected offset: %d\n",recvdatabuf[recv_id]->arrivedBytes/1349,msg_h->offset/1349,recvdatabuf[recv_id]->expectedOffset/1349);
769
	recvdatabuf[recv_id]->arrivedBytes += bufsize;
959 770

  
960 771
	// enter the data into the buffer
961 772
	memcpy(recvdatabuf[recv_id]->recvbuf + msg_h->len_mon_data_hdr + msg_h->offset, msgbuf, bufsize);
962
#ifdef RTX
963
	// detecting a new gap	
964
	if (msg_h->offset > recvdatabuf[recv_id]->expectedOffset) {
965
		recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->gapCounter].offsetFrom = recvdatabuf[recv_id]->expectedOffset;
966
		recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->gapCounter].offsetTo = msg_h->offset;
967
		if (recvdatabuf[recv_id]->gapCounter < RTX_MAX_GAPS - 1) recvdatabuf[recv_id]->gapCounter++;
968
		evtimer_add(event_new(base, -1, EV_TIMEOUT, &pkt_recv_timeout_cb, (void *) (long)recv_id), &pkt_recv_timeout);
969
	}
970
	
971
	//filling the gap by delayed packets
972
	if (msg_h->offset < recvdatabuf[recv_id]->expectedOffset){
973
		counters.receivedRTXDataPktCounter++;
974
		//skip retransmitted packets
975
		if (recvdatabuf[recv_id]->firstGap < recvdatabuf[recv_id]->gapCounter && msg_h->offset >= recvdatabuf[recv_id]->gapArray[recvdatabuf[recv_id]->firstGap].offsetFrom) {
976
			int i;
977
			//fprintf(stderr,"firstGap: %d	gapCounter: %d\n", recvdatabuf[recv_id]->firstGap, recvdatabuf[recv_id]->gapCounter);
978
			for (i = recvdatabuf[recv_id]->firstGap; i < recvdatabuf[recv_id]->gapCounter; i++){
979
				if (msg_h->offset == recvdatabuf[recv_id]->gapArray[i].offsetFrom) {
980
					recvdatabuf[recv_id]->gapArray[i].offsetFrom += bufsize;
981
					break;
982
				}
983
				if (msg_h->offset == (recvdatabuf[recv_id]->gapArray[i].offsetTo - bufsize)) {
984
					recvdatabuf[recv_id]->gapArray[i].offsetTo -= bufsize;
985
					break;
986
				}
987
			}
988
		} else {//fprintf(stderr,"Skipping retransmitted packets in filling the gap.\n"); 
989
			//counters.receivedRTXDataPktCounter++;
990
			}
991
	}
992

  
993
	//updating the expectedOffset	
994
	if (msg_h->offset >= recvdatabuf[recv_id]->expectedOffset) recvdatabuf[recv_id]->expectedOffset = msg_h->offset + bufsize;
995
#endif
996 773

  
997 774
	//TODO very basic checkif all fragments arrived: has to be reviewed
998 775
	if(recvdatabuf[recv_id]->arrivedBytes == recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen)
......
1045 822
				debug("ML: received message from conID:%d, %s\n",recvdatabuf[recv_id]->connectionID,str);
1046 823
				rParams.firstPacketArrived = recvdatabuf[recv_id]->firstPacketArrived;
1047 824

  
1048
#ifdef RTX
1049
				counters.receivedCompleteMsgCounter++;
1050
				//mlShowCounters();
1051
#endif
1052

  
1053 825
				(receive_data_callback) (recvdatabuf[recv_id]->recvbuf + recvdatabuf[recv_id]->monitoringDataHeaderLen, recvdatabuf[recv_id]->bufsize - recvdatabuf[recv_id]->monitoringDataHeaderLen,
1054 826
					recvdatabuf[recv_id]->msgtype, (void *) &rParams);
1055 827
			} else {
1056 828
			    warn("ML: callback not initialized for this message type: %d!\n",msg_h->msg_type);
1057 829
			}
1058
			
830

  
1059 831
			//clean up
1060 832
			if (recvdatabuf[recv_id]->timeout_event) {
1061
				debug("ML: freeing timeout for %d\n",recv_id);
833
				debug("ML: freeing timeout for %d",recv_id);
1062 834
				event_del(recvdatabuf[recv_id]->timeout_event);
1063 835
				event_free(recvdatabuf[recv_id]->timeout_event);
1064 836
				recvdatabuf[recv_id]->timeout_event = NULL;
1065 837
			} else {
1066 838
				debug("ML: received in 1 packet\n",recv_id);
1067 839
			}
1068
#ifdef RTX
1069
			if (recvdatabuf[recv_id]->last_pkt_timeout_event) {
1070
				debug("ML: freeing last packet timeout for %d",recv_id);
1071
				event_del(recvdatabuf[recv_id]->last_pkt_timeout_event);
1072
				event_free(recvdatabuf[recv_id]->last_pkt_timeout_event);
1073
				recvdatabuf[recv_id]->last_pkt_timeout_event = NULL;
1074
			}
1075
			//fprintf(stderr,"******Cleaning slot for clomplete msg_seq_num: %d\n", recvdatabuf[recv_id]->seqnr);	
1076
#endif
1077 840
			free(recvdatabuf[recv_id]->recvbuf);
1078 841
			free(recvdatabuf[recv_id]);
1079 842
			recvdatabuf[recv_id] = NULL;
......
1083 846
				//TODO make timeout at least a DEFINE
1084 847
				recvdatabuf[recv_id]->timeout_event = event_new(base, -1, EV_TIMEOUT, &recv_timeout_cb, (void *) (long)recv_id);
1085 848
				evtimer_add(recvdatabuf[recv_id]->timeout_event, &recv_timeout);
1086
#ifdef RTX
1087
				recvdatabuf[recv_id]->last_pkt_timeout_event = event_new(base, -1, EV_TIMEOUT, &last_pkt_recv_timeout_cb, (void *) (long)recv_id);
1088
				evtimer_add(recvdatabuf[recv_id]->last_pkt_timeout_event, &last_pkt_recv_timeout);
1089
#endif
1090 849
			}
1091 850
		}
1092 851
	}
......
1138 897
	}
1139 898

  
1140 899
	//error in PMTU discovery?
1141
	if (connectbuf[con_id]->pmtusize == P_ERROR) {
900
	if (connectbuf[con_id]->pmtusize == ERROR) {
1142 901
		if (connectbuf[con_id]->internal_connect == true) {
1143 902
			//as of now we tried directly connecting, now let's try trough the NAT
1144 903
			connectbuf[con_id]->internal_connect = false;
......
1191 950
	case BELOWDSL:
1192 951
		return MIN;
1193 952
	case MIN:
1194
		return P_ERROR;
953
		return ERROR;
1195 954
	default:
1196 955
		warn("ML: strange pmtu size encountered:%d, changing to some safe value:%d\n", pmtusize, MIN);
1197 956
		return MIN;
......
1252 1011

  
1253 1012
//    connectbuf[connectionID]->pmtutrysize = new_pmtusize;
1254 1013

  
1255
    if (new_pmtusize == P_ERROR) {
1014
    if (new_pmtusize == ERROR) {
1256 1015
		error("ML:  Could not create connection with connectionID %i !\n",
1257 1016
			connectionID);
1258 1017

  
......
1367 1126
	unsigned short stun_bind_response = 0x0101;
1368 1127
	unsigned short * msgspot = (unsigned short *) msgbuf;
1369 1128
	if (*msgspot == stun_bind_response) {
1370
		debug("ML: recv_pkg: parse stun message called on %d bytes\n", recvSize);
1129
		debug("ML: recv_pkg: parse stun message called\n");
1371 1130
		recv_stun_msg(msgbuf, recvSize);
1372 1131
		return;
1373 1132
	}
......
1429 1188
			debug("ML: received conn pkg\n");
1430 1189
			recv_conn_msg(msg_h, bufptr, msg_size, &recv_addr);
1431 1190
			break;
1432
#ifdef RTX
1433
		case ML_NACK_MSG:
1434
			debug("ML: received nack pkg\n");
1435
			recv_nack_msg(msg_h, bufptr, msg_size);
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff