Statistics
| Branch: | Revision:

napa-baselibs / ml / util / queueManagement.c @ d1048fe2

History | View | Annotate | Download (6.38 KB)

1
/*
2
 *          
3
 *
4
 *
5
 * 
6
 *        Agnieszka Witczak & Szymon Kuc
7
 *     
8
 */
9

    
10

    
11
#include <stdio.h>
12
#include <stdlib.h>
13
#include <math.h>
14

    
15
#include <unistd.h>
16
#include <string.h>
17
#ifndef WIN32
18
#include <sys/uio.h>
19
#include <netinet/in.h>
20
#endif
21

    
22
#include "queueManagement.h"
23
#include "udpSocket.h"
24

    
25
#include "../transmissionHandler.h"
26

    
27
FILE *file = NULL;
28

    
29
PacketQueue TXqueue;
30
PacketQueue RTXqueue;
31

    
32
unsigned int sentRTXDataPktCounter = 0;
33

    
34
int TXmaxSize = 6000*1500;                        //bytes
35
int RTXmaxSize = 6000*1500;                        //bytes
36

    
37
struct timeval maxTimeToHold = {5,0};
38

    
39
PacketContainer* createPacketContainer(const int uSoc,struct iovec *ioVector,int iovlen,struct sockaddr_in *sockAddress, unsigned char prior) {
40
        
41
        PacketContainer *packet = malloc(sizeof(PacketContainer));
42
        packet->udpSocket = uSoc;
43
        packet->iovlen = iovlen;
44
        packet->next = NULL;
45
        packet->pktLen = ioVector[0].iov_len + ioVector[1].iov_len + ioVector[2].iov_len + ioVector[3].iov_len;
46

    
47
        packet->priority = prior;
48

    
49
         int i;
50
        packet->iov = malloc(sizeof(struct iovec) * iovlen);
51
        for (i=0; i<iovlen; i++){
52
                packet->iov[i].iov_len = ioVector[i].iov_len;
53
                packet->iov[i].iov_base = malloc(packet->iov[i].iov_len);
54
                memcpy(packet->iov[i].iov_base, ioVector[i].iov_base, packet->iov[i].iov_len);
55
        }
56

    
57
        packet->socketaddr = malloc(sizeof(struct sockaddr_in));
58
        memcpy(packet->socketaddr, sockAddress, sizeof(struct sockaddr_in));
59

    
60
        int packet_len = ioVector[0].iov_len + ioVector[1].iov_len + ioVector[2].iov_len + ioVector[3].iov_len;
61
        return packet;
62
}
63

    
64
void destroyPacketContainer(PacketContainer* pktContainer){
65

    
66
        if (pktContainer != NULL){
67
                int i;
68
                for (i=0; i < pktContainer->iovlen; i++) free(pktContainer->iov[i].iov_base);
69
                free(pktContainer->iov);
70
                free(pktContainer->socketaddr);
71
                free(pktContainer);
72
        }
73
}
74

    
75
int addPacketTXqueue(PacketContainer *packet) {
76
        if ((TXqueue.size + packet->pktLen) > TXmaxSize) {
77
                fprintf(stderr,"[queueManagement::addPacketTXqueue] -- Sorry! Max size for TX queue. Packet will be discarded. \n");
78
                destroyPacketContainer(packet);
79
                return THROTTLE;
80
        }
81
        TXqueue.size += packet->pktLen;        
82

    
83
        if (TXqueue.head == NULL) {                        //adding first element
84
                TXqueue.head = packet;
85
                TXqueue.tail = packet;                
86
        } else {                                        //adding at the end of the queue
87
                TXqueue.tail->next = packet;
88
                TXqueue.tail = packet;
89
        }
90
        
91
        return OK;
92
}
93

    
94
PacketContainer* takePacketToSend() {                        //returns pointer to packet or NULL if queue is empty
95

    
96
        if (TXqueue.head != NULL) {
97
                PacketContainer *packet = TXqueue.head;
98
                
99
                TXqueue.size -= packet->pktLen;
100
                TXqueue.head = TXqueue.head->next;
101
                packet->next = NULL;
102
                if (TXqueue.head == NULL) TXqueue.tail = NULL;                                        
103
                return packet;
104
        }
105
        else return NULL;        
106
}
107

    
108
#ifdef RTX
109
void addPacketRTXqueue(PacketContainer *packet) {
110
        //removing old packets - because of maxTimeToHold
111
        struct timeval now, age;
112
        gettimeofday(&now, NULL);
113
 
114
        PacketContainer *tmp = RTXqueue.head;
115
        while (tmp != NULL) {
116
                age.tv_sec = now.tv_sec - tmp->timeStamp.tv_sec;
117
                age.tv_usec = now.tv_usec - tmp->timeStamp.tv_usec;
118

    
119
                if (age.tv_sec > maxTimeToHold.tv_sec) {
120

    
121
                        tmp = tmp->next;
122
                        removeOldestPacket();
123
                }
124
                else break;
125
        }
126

    
127
        while ((RTXqueue.size + packet->pktLen) > RTXmaxSize) {
128
                removeOldestPacket();
129
        }
130

    
131
        //adding timeStamp
132
        gettimeofday(&(packet->timeStamp), NULL);
133

    
134
        //finally - adding packet
135
        RTXqueue.size += packet->pktLen;
136
        packet->next = NULL;
137

    
138
        if (RTXqueue.head == NULL) {                        //adding first element
139
                RTXqueue.head = packet;
140
                RTXqueue.tail = packet;
141
                return;        
142
        } else {                                        //adding at the end of the queue
143
                RTXqueue.tail->next = packet;
144
                RTXqueue.tail = packet;
145
                return;
146
        }
147
}
148

    
149
int removeOldestPacket() {                        //return 0 if success, else (queue empty) return 1
150
        if (RTXqueue.head != NULL) {
151
                RTXqueue.size -= RTXqueue.head->pktLen;
152
                PacketContainer *pointer = RTXqueue.head;                
153
                RTXqueue.head = RTXqueue.head->next;
154
                destroyPacketContainer(pointer);
155
                return 0;
156
        }
157
        return 1;        
158
}
159
#endif
160

    
161
void setQueuesParams (int TXsize, int RTXsize, double maxTTHold) { //in bytes, bytes, sexonds
162
        TXmaxSize = TXsize;
163
        RTXmaxSize = RTXsize;
164
        maxTimeToHold.tv_sec = (int)floor(maxTTHold);
165
        maxTimeToHold.tv_usec = (int)(1000000.0 * fmod(maxTTHold, 1.0));
166
}
167

    
168
int isQueueEmpty() {
169
        
170
        if (TXqueue.head == NULL) return 1;
171
        return 0;
172
}
173

    
174
int getFirstPacketSize() {
175

    
176
        if (TXqueue.head != NULL) return TXqueue.head->pktLen;
177
        return 0;
178
}
179

    
180
#ifdef RTX
181
PacketContainer* searchPacketInRTX(int connID, int msgSeqNum, int offset) {
182
        
183
        connID = ntohl(connID);
184
        msgSeqNum = ntohl(msgSeqNum);
185
        offset = ntohl(offset);
186

    
187
        //fprintf(stderr,"***************Searching for packet... connID: %d msgSeqNum: %d offset: %d\n",connID,ntohl(msgSeqNum),ntohl(offset));
188

    
189
        PacketContainer *tmp = RTXqueue.head;
190
        PacketContainer *prev = NULL;
191

    
192
        while (tmp != NULL) {
193
                struct msg_header *msg_h;
194
                
195
                msg_h = (struct msg_header *) tmp->iov[0].iov_base;
196

    
197
                //fprintf(stderr,"^^^^^^^^^Searching^^^^^^^^^^^^ connID: %d msgSeqNum: %d offset: %d\n",connID,ntohl(msgSeqNum),offset);
198
                //fprintf(stderr,"^^^^^^^^^In queue^^^^^^^^^^^^ connID: %d msgSeqNum: %d offset: %d\n",msg_h->local_con_id,ntohl(msg_h->msg_seq_num),ntohl(msg_h->offset));
199

    
200
                if ((msg_h->local_con_id == connID) && (msg_h->msg_seq_num == msgSeqNum) && (msg_h->offset == offset)) {
201
                //fprintf(stderr,"*****************Packet found: ConID: %d  MsgseqNum: %d Offset: %d \n", ntohl(connID),ntohl(msgSeqNum),ntohl(offset));
202
                        if (tmp == RTXqueue.head) RTXqueue.head = tmp->next;
203
                        else if (tmp == RTXqueue.tail) { RTXqueue.tail = prev; prev->next == NULL; }
204
                        else prev->next = tmp->next;
205
                        RTXqueue.size -= tmp->pktLen;
206
                        return tmp;
207
                }
208
                prev = tmp;
209
                tmp = tmp->next;
210
        }
211

    
212
return NULL;
213
}
214

    
215
int rtxPacketsFromTo(int connID, int msgSeqNum, int offsetFrom, int offsetTo) {
216
        int offset = offsetFrom;
217
        //fprintf(stderr,"\t\t\t\t Retransmission request From: %d To: %d\n",offsetFrom, offsetTo);
218

    
219
        while (offset < offsetTo) {
220
                PacketContainer *packetToRTX = searchPacketInRTX(connID,msgSeqNum,offset);
221
                if (packetToRTX == NULL) return 1;
222

    
223
                //sending packet
224
                //fprintf(stderr,"\t\t\t\t\t Retransmitting packet: %d of msg_seq_num %d.\n",offset/1349,msgSeqNum);
225
                sendPacket(packetToRTX->udpSocket, packetToRTX->iov, 4, packetToRTX->socketaddr);
226
                sentRTXDataPktCounter++;
227
                offset += packetToRTX->iov[3].iov_len;
228
                destroyPacketContainer(packetToRTX);
229
        }
230
        return 0;
231
}
232
#endif