Statistics
| Branch: | Revision:

ml / util / queueManagement.c @ 10bf846f

History | View | Annotate | Download (6.38 KB)

1
/*
2
 *          
3
 *
4
 *
5
 * 
6
 *        Agnieszka Witczak & Szymon Kuc
7
 *     
8
 */
9

    
10

    
11
#include <stdio.h>
12
#include <stdlib.h>
13
#include <math.h>
14

    
15
#include <unistd.h>
16
#include <string.h>
17
#ifndef WIN32
18
#include <sys/uio.h>
19
#include <netinet/in.h>
20
#endif
21

    
22
#include "queueManagement.h"
23
#include "udpSocket.h"
24

    
25
#include "../transmissionHandler.h"
26

    
27
FILE *file = NULL;
28

    
29
PacketQueue TXqueue;
30
PacketQueue RTXqueue;
31

    
32
unsigned int sentRTXDataPktCounter = 0;
33

    
34
int TXmaxSize = 6000*1500;                        //bytes
35
int RTXmaxSize = 6000*1500;                        //bytes
36

    
37
struct timeval maxTimeToHold = {5,0};
38

    
39
PacketContainer* createPacketContainer(const int uSoc,struct iovec *ioVector,int iovlen,struct sockaddr_in *sockAddress, unsigned char prior) {
40
        
41
        PacketContainer *packet = malloc(sizeof(PacketContainer));
42
        packet->udpSocket = uSoc;
43
        packet->iovlen = iovlen;
44
        packet->next = NULL;
45
        packet->pktLen = ioVector[0].iov_len + ioVector[1].iov_len + ioVector[2].iov_len + ioVector[3].iov_len;
46

    
47
        packet->priority = prior;
48

    
49
         int i;
50
        packet->iov = malloc(sizeof(struct iovec) * iovlen);
51
        for (i=0; i<iovlen; i++){
52
                packet->iov[i].iov_len = ioVector[i].iov_len;
53
                packet->iov[i].iov_base = malloc(packet->iov[i].iov_len);
54
                memcpy(packet->iov[i].iov_base, ioVector[i].iov_base, packet->iov[i].iov_len);
55
        }
56

    
57
        packet->socketaddr = malloc(sizeof(struct sockaddr_in));
58
        memcpy(packet->socketaddr, sockAddress, sizeof(struct sockaddr_in));
59

    
60
        int packet_len = ioVector[0].iov_len + ioVector[1].iov_len + ioVector[2].iov_len + ioVector[3].iov_len;
61
        return packet;
62
}
63

    
64
void destroyPacketContainer(PacketContainer* pktContainer){
65

    
66
        int i;
67
        for (i=0; i < pktContainer->iovlen; i++) free(pktContainer->iov[i].iov_base);
68
        free(pktContainer->iov);
69
        free(pktContainer->socketaddr);
70
        free(pktContainer);
71
}
72

    
73
int addPacketTXqueue(PacketContainer *packet) {
74
        if ((TXqueue.size + packet->pktLen) > TXmaxSize) {
75
                fprintf(stderr,"[queueManagement::addPacketTXqueue] -- Sorry! Max size for TX queue. Packet will be discarded. \n");
76
                destroyPacketContainer(packet);
77
                return THROTTLE;
78
        }
79
        TXqueue.size += packet->pktLen;        
80

    
81
        if (TXqueue.head == NULL) {                        //adding first element
82
                TXqueue.head = packet;
83
                TXqueue.tail = packet;                
84
        } else {                                        //adding at the end of the queue
85
                TXqueue.tail->next = packet;
86
                TXqueue.tail = packet;
87
        }
88
        
89
        return OK;
90
}
91

    
92
PacketContainer* takePacketToSend() {                        //returns pointer to packet or NULL if queue is empty
93

    
94
        if (TXqueue.head != NULL) {
95
                PacketContainer *packet = TXqueue.head;
96
                
97
                TXqueue.size -= packet->pktLen;
98
                TXqueue.head = TXqueue.head->next;
99
                packet->next = NULL;
100
                                        
101
                return packet;
102
        }
103
        else return NULL;        
104
}
105

    
106

    
107
void addPacketRTXqueue(PacketContainer *packet) {
108
        //removing old packets - because of maxTimeToHold
109
        struct timeval now, age;
110
        gettimeofday(&now, NULL);
111
 
112
        PacketContainer *tmp = RTXqueue.head;
113
        while (tmp != NULL) {
114
                age.tv_sec = now.tv_sec - tmp->timeStamp.tv_sec;
115
                age.tv_usec = now.tv_usec - tmp->timeStamp.tv_usec;
116

    
117
                if (age.tv_sec > maxTimeToHold.tv_sec) {
118

    
119
                        tmp = tmp->next;
120
                        removeOldestPacket();
121
                }
122
                else break;
123
        }
124

    
125
        while ((RTXqueue.size + packet->pktLen) > RTXmaxSize) {
126
                removeOldestPacket();
127
        }
128

    
129
        //adding timeStamp
130
        gettimeofday(&(packet->timeStamp), NULL);
131

    
132
        //finally - adding packet
133
        RTXqueue.size += packet->pktLen;
134
        packet->next = NULL;
135

    
136
        if (RTXqueue.head == NULL) {                        //adding first element
137
                RTXqueue.head = packet;
138
                RTXqueue.tail = packet;
139
                return;        
140
        } else {                                        //adding at the end of the queue
141
                RTXqueue.tail->next = packet;
142
                RTXqueue.tail = packet;
143
                return;
144
        }
145
}
146

    
147
int removeOldestPacket() {                        //return 0 if success, else (queue empty) return 1
148
        if (RTXqueue.head != NULL) {
149
                RTXqueue.size -= RTXqueue.head->pktLen;
150
                PacketContainer *pointer = RTXqueue.head;                
151
                RTXqueue.head = RTXqueue.head->next;
152
                destroyPacketContainer(pointer);
153
                return 0;
154
        }
155
        return 1;        
156
}
157

    
158
void setQueuesParams (int TXsize, int RTXsize, double maxTTHold) {
159
        TXmaxSize = TXsize * 1024;
160
        RTXmaxSize = RTXsize * 1024;
161
        maxTimeToHold.tv_sec = (int)floor(maxTTHold);
162
        maxTimeToHold.tv_usec = (int)(1000000.0 * fmod(maxTTHold, 1.0));
163
}
164

    
165
int isQueueEmpty() {
166
        
167
        if (TXqueue.head == NULL) return 1;
168
        return 0;
169
}
170

    
171
int getFirstPacketSize() {
172

    
173
        if (TXqueue.head != NULL) return TXqueue.head->pktLen;
174
        return 0;
175
}
176

    
177
PacketContainer* searchPacketInRTX(int connID, int msgSeqNum, int offset) {
178
        
179
        connID = ntohl(connID);
180
        msgSeqNum = ntohl(msgSeqNum);
181
        offset = ntohl(offset);
182

    
183
        //fprintf(stderr,"***************Searching for packet... connID: %d msgSeqNum: %d offset: %d\n",connID,ntohl(msgSeqNum),ntohl(offset));
184

    
185
        PacketContainer *tmp = RTXqueue.head;
186
        PacketContainer *prev = NULL;
187

    
188
        while (tmp != NULL) {
189
                struct msg_header *msg_h;
190
                
191
                msg_h = (struct msg_header *) tmp->iov[0].iov_base;
192

    
193
                //fprintf(stderr,"^^^^^^^^^Searching^^^^^^^^^^^^ connID: %d msgSeqNum: %d offset: %d\n",connID,ntohl(msgSeqNum),offset);
194
                //fprintf(stderr,"^^^^^^^^^In queue^^^^^^^^^^^^ connID: %d msgSeqNum: %d offset: %d\n",msg_h->local_con_id,ntohl(msg_h->msg_seq_num),ntohl(msg_h->offset));
195

    
196
                if ((msg_h->local_con_id == connID) && (msg_h->msg_seq_num == msgSeqNum) && (msg_h->offset == offset)) {
197
                //fprintf(stderr,"*****************Packet found: ConID: %d  MsgseqNum: %d Offset: %d \n", ntohl(connID),ntohl(msgSeqNum),ntohl(offset));
198
                        if (tmp == RTXqueue.head) RTXqueue.head = tmp->next;
199
                        else if (tmp == RTXqueue.tail) { RTXqueue.tail = prev; prev->next == NULL; }
200
                        else prev->next = tmp->next;
201
                        RTXqueue.size -= tmp->pktLen;
202
                        return tmp;
203
                }
204
                prev = tmp;
205
                tmp = tmp->next;
206
        }
207

    
208
return NULL;
209
}
210

    
211
int rtxPacketsFromTo(int connID, int msgSeqNum, int offsetFrom, int offsetTo) {
212
        int offset = offsetFrom;
213
        //fprintf(stderr,"\t\t\t\t Retransmission request From: %d To: %d\n",offsetFrom, offsetTo);
214

    
215
        while (offset < offsetTo) {
216
                PacketContainer *packetToRTX = searchPacketInRTX(connID,msgSeqNum,offset);
217
                if (packetToRTX == NULL) return 1;
218

    
219
                //sending packet
220
                //fprintf(stderr,"\t\t\t\t\t Retransmitting packet: %d of msg_seq_num %d.\n",offset/1349,msgSeqNum);
221
                sendPacket(packetToRTX->udpSocket, packetToRTX->iov, 4, packetToRTX->socketaddr);
222
                sentRTXDataPktCounter++;
223
                //queueOrSendPacket(packetToRTX->udpSocket,packetToRTX->iov,packetToRTX->pktLen,packetToRTX->socketaddr,HP);
224
                offset += packetToRTX->iov[3].iov_len;                  //????????????????????
225
        }
226
        return 0;
227
}
228