ml / util / queueManagement.c @ 6575ae37
History | View | Annotate | Download (6.16 KB)
1 |
/*
|
---|---|
2 |
*
|
3 |
*
|
4 |
*
|
5 |
*
|
6 |
* Agnieszka Witczak & Szymon Kuc
|
7 |
*
|
8 |
*/
|
9 |
|
10 |
|
11 |
#include "../ml_all.h" |
12 |
|
13 |
FILE *file = NULL;
|
14 |
|
15 |
PacketQueue TXqueue; |
16 |
PacketQueue RTXqueue; |
17 |
|
18 |
unsigned int sentRTXDataPktCounter = 0; |
19 |
|
20 |
int TXmaxSize = 6000*1500; //bytes |
21 |
int RTXmaxSize = 6000*1500; //bytes |
22 |
|
23 |
struct timeval maxTimeToHold = {5,0}; |
24 |
|
25 |
PacketContainer* createPacketContainer(const int uSoc,struct iovec *ioVector,int iovlen,struct sockaddr_in *sockAddress, unsigned char prior) { |
26 |
|
27 |
PacketContainer *packet = malloc(sizeof(PacketContainer));
|
28 |
packet->udpSocket = uSoc; |
29 |
packet->iovlen = iovlen; |
30 |
packet->next = NULL;
|
31 |
packet->pktLen = ioVector[0].iov_len + ioVector[1].iov_len + ioVector[2].iov_len + ioVector[3].iov_len; |
32 |
|
33 |
packet->priority = prior; |
34 |
|
35 |
int i;
|
36 |
packet->iov = malloc(sizeof(struct iovec) * iovlen); |
37 |
for (i=0; i<iovlen; i++){ |
38 |
packet->iov[i].iov_len = ioVector[i].iov_len; |
39 |
packet->iov[i].iov_base = malloc(packet->iov[i].iov_len); |
40 |
memcpy(packet->iov[i].iov_base, ioVector[i].iov_base, packet->iov[i].iov_len); |
41 |
} |
42 |
|
43 |
packet->socketaddr = malloc(sizeof(struct sockaddr_in)); |
44 |
memcpy(packet->socketaddr, sockAddress, sizeof(struct sockaddr_in)); |
45 |
|
46 |
int packet_len = ioVector[0].iov_len + ioVector[1].iov_len + ioVector[2].iov_len + ioVector[3].iov_len; |
47 |
return packet;
|
48 |
} |
49 |
|
50 |
void destroyPacketContainer(PacketContainer* pktContainer){
|
51 |
|
52 |
if (pktContainer != NULL){ |
53 |
int i;
|
54 |
for (i=0; i < pktContainer->iovlen; i++) free(pktContainer->iov[i].iov_base); |
55 |
free(pktContainer->iov); |
56 |
free(pktContainer->socketaddr); |
57 |
free(pktContainer); |
58 |
} |
59 |
} |
60 |
|
61 |
int addPacketTXqueue(PacketContainer *packet) {
|
62 |
if ((TXqueue.size + packet->pktLen) > TXmaxSize) {
|
63 |
fprintf(stderr,"[queueManagement::addPacketTXqueue] -- Sorry! Max size for TX queue. Packet will be discarded. \n");
|
64 |
destroyPacketContainer(packet); |
65 |
return THROTTLE;
|
66 |
} |
67 |
TXqueue.size += packet->pktLen; |
68 |
|
69 |
if (TXqueue.head == NULL) { //adding first element |
70 |
TXqueue.head = packet; |
71 |
TXqueue.tail = packet; |
72 |
} else { //adding at the end of the queue |
73 |
TXqueue.tail->next = packet; |
74 |
TXqueue.tail = packet; |
75 |
} |
76 |
|
77 |
return OK;
|
78 |
} |
79 |
|
80 |
PacketContainer* takePacketToSend() { //returns pointer to packet or NULL if queue is empty
|
81 |
|
82 |
if (TXqueue.head != NULL) { |
83 |
PacketContainer *packet = TXqueue.head; |
84 |
|
85 |
TXqueue.size -= packet->pktLen; |
86 |
TXqueue.head = TXqueue.head->next; |
87 |
packet->next = NULL;
|
88 |
if (TXqueue.head == NULL) TXqueue.tail = NULL; |
89 |
return packet;
|
90 |
} |
91 |
else return NULL; |
92 |
} |
93 |
|
94 |
#ifdef RTX
|
95 |
void addPacketRTXqueue(PacketContainer *packet) {
|
96 |
//removing old packets - because of maxTimeToHold
|
97 |
struct timeval now, age;
|
98 |
gettimeofday(&now, NULL);
|
99 |
|
100 |
PacketContainer *tmp = RTXqueue.head; |
101 |
while (tmp != NULL) { |
102 |
age.tv_sec = now.tv_sec - tmp->timeStamp.tv_sec; |
103 |
age.tv_usec = now.tv_usec - tmp->timeStamp.tv_usec; |
104 |
|
105 |
if (age.tv_sec > maxTimeToHold.tv_sec) {
|
106 |
|
107 |
tmp = tmp->next; |
108 |
removeOldestPacket(); |
109 |
} |
110 |
else break; |
111 |
} |
112 |
|
113 |
while ((RTXqueue.size + packet->pktLen) > RTXmaxSize) {
|
114 |
removeOldestPacket(); |
115 |
} |
116 |
|
117 |
//adding timeStamp
|
118 |
gettimeofday(&(packet->timeStamp), NULL);
|
119 |
|
120 |
//finally - adding packet
|
121 |
RTXqueue.size += packet->pktLen; |
122 |
packet->next = NULL;
|
123 |
|
124 |
if (RTXqueue.head == NULL) { //adding first element |
125 |
RTXqueue.head = packet; |
126 |
RTXqueue.tail = packet; |
127 |
return;
|
128 |
} else { //adding at the end of the queue |
129 |
RTXqueue.tail->next = packet; |
130 |
RTXqueue.tail = packet; |
131 |
return;
|
132 |
} |
133 |
} |
134 |
|
135 |
int removeOldestPacket() { //return 0 if success, else (queue empty) return 1 |
136 |
if (RTXqueue.head != NULL) { |
137 |
RTXqueue.size -= RTXqueue.head->pktLen; |
138 |
PacketContainer *pointer = RTXqueue.head; |
139 |
RTXqueue.head = RTXqueue.head->next; |
140 |
destroyPacketContainer(pointer); |
141 |
return 0; |
142 |
} |
143 |
return 1; |
144 |
} |
145 |
#endif
|
146 |
|
147 |
void setQueuesParams (int TXsize, int RTXsize, double maxTTHold) { //in bytes, bytes, sexonds |
148 |
TXmaxSize = TXsize; |
149 |
RTXmaxSize = RTXsize; |
150 |
maxTimeToHold.tv_sec = (int)floor(maxTTHold);
|
151 |
maxTimeToHold.tv_usec = (int)(1000000.0 * fmod(maxTTHold, 1.0)); |
152 |
} |
153 |
|
154 |
int isQueueEmpty() {
|
155 |
|
156 |
if (TXqueue.head == NULL) return 1; |
157 |
return 0; |
158 |
} |
159 |
|
160 |
int getFirstPacketSize() {
|
161 |
|
162 |
if (TXqueue.head != NULL) return TXqueue.head->pktLen; |
163 |
return 0; |
164 |
} |
165 |
|
166 |
#ifdef RTX
|
167 |
PacketContainer* searchPacketInRTX(int connID, int msgSeqNum, int offset) { |
168 |
|
169 |
connID = ntohl(connID); |
170 |
msgSeqNum = ntohl(msgSeqNum); |
171 |
offset = ntohl(offset); |
172 |
|
173 |
//fprintf(stderr,"***************Searching for packet... connID: %d msgSeqNum: %d offset: %d\n",connID,ntohl(msgSeqNum),ntohl(offset));
|
174 |
|
175 |
PacketContainer *tmp = RTXqueue.head; |
176 |
PacketContainer *prev = NULL;
|
177 |
|
178 |
while (tmp != NULL) { |
179 |
struct msg_header *msg_h;
|
180 |
|
181 |
msg_h = (struct msg_header *) tmp->iov[0].iov_base; |
182 |
|
183 |
//fprintf(stderr,"^^^^^^^^^Searching^^^^^^^^^^^^ connID: %d msgSeqNum: %d offset: %d\n",connID,ntohl(msgSeqNum),offset);
|
184 |
//fprintf(stderr,"^^^^^^^^^In queue^^^^^^^^^^^^ connID: %d msgSeqNum: %d offset: %d\n",msg_h->local_con_id,ntohl(msg_h->msg_seq_num),ntohl(msg_h->offset));
|
185 |
|
186 |
if ((msg_h->local_con_id == connID) && (msg_h->msg_seq_num == msgSeqNum) && (msg_h->offset == offset)) {
|
187 |
//fprintf(stderr,"*****************Packet found: ConID: %d MsgseqNum: %d Offset: %d \n", ntohl(connID),ntohl(msgSeqNum),ntohl(offset));
|
188 |
if (tmp == RTXqueue.head) RTXqueue.head = tmp->next;
|
189 |
else if (tmp == RTXqueue.tail) { RTXqueue.tail = prev; prev->next == NULL; } |
190 |
else prev->next = tmp->next;
|
191 |
RTXqueue.size -= tmp->pktLen; |
192 |
return tmp;
|
193 |
} |
194 |
prev = tmp; |
195 |
tmp = tmp->next; |
196 |
} |
197 |
|
198 |
return NULL; |
199 |
} |
200 |
|
201 |
int rtxPacketsFromTo(int connID, int msgSeqNum, int offsetFrom, int offsetTo) { |
202 |
int offset = offsetFrom;
|
203 |
//fprintf(stderr,"\t\t\t\t Retransmission request From: %d To: %d\n",offsetFrom, offsetTo);
|
204 |
|
205 |
while (offset < offsetTo) {
|
206 |
PacketContainer *packetToRTX = searchPacketInRTX(connID,msgSeqNum,offset); |
207 |
if (packetToRTX == NULL) return 1; |
208 |
|
209 |
//sending packet
|
210 |
//fprintf(stderr,"\t\t\t\t\t Retransmitting packet: %d of msg_seq_num %d.\n",offset/1349,msgSeqNum);
|
211 |
sendPacket(packetToRTX->udpSocket, packetToRTX->iov, 4, packetToRTX->socketaddr);
|
212 |
sentRTXDataPktCounter++; |
213 |
offset += packetToRTX->iov[3].iov_len;
|
214 |
destroyPacketContainer(packetToRTX); |
215 |
} |
216 |
return 0; |
217 |
} |
218 |
#endif
|