Statistics
| Branch: | Revision:

ml / util / queueManagement.c @ 6575ae37

History | View | Annotate | Download (6.16 KB)

1
/*
2
 *          
3
 *
4
 *
5
 * 
6
 *        Agnieszka Witczak & Szymon Kuc
7
 *     
8
 */
9

    
10

    
11
#include "../ml_all.h"
12

    
13
FILE *file = NULL;
14

    
15
PacketQueue TXqueue;
16
PacketQueue RTXqueue;
17

    
18
unsigned int sentRTXDataPktCounter = 0;
19

    
20
int TXmaxSize = 6000*1500;                        //bytes
21
int RTXmaxSize = 6000*1500;                        //bytes
22

    
23
struct timeval maxTimeToHold = {5,0};
24

    
25
PacketContainer* createPacketContainer(const int uSoc,struct iovec *ioVector,int iovlen,struct sockaddr_in *sockAddress, unsigned char prior) {
26
        
27
        PacketContainer *packet = malloc(sizeof(PacketContainer));
28
        packet->udpSocket = uSoc;
29
        packet->iovlen = iovlen;
30
        packet->next = NULL;
31
        packet->pktLen = ioVector[0].iov_len + ioVector[1].iov_len + ioVector[2].iov_len + ioVector[3].iov_len;
32

    
33
        packet->priority = prior;
34

    
35
         int i;
36
        packet->iov = malloc(sizeof(struct iovec) * iovlen);
37
        for (i=0; i<iovlen; i++){
38
                packet->iov[i].iov_len = ioVector[i].iov_len;
39
                packet->iov[i].iov_base = malloc(packet->iov[i].iov_len);
40
                memcpy(packet->iov[i].iov_base, ioVector[i].iov_base, packet->iov[i].iov_len);
41
        }
42

    
43
        packet->socketaddr = malloc(sizeof(struct sockaddr_in));
44
        memcpy(packet->socketaddr, sockAddress, sizeof(struct sockaddr_in));
45

    
46
        int packet_len = ioVector[0].iov_len + ioVector[1].iov_len + ioVector[2].iov_len + ioVector[3].iov_len;
47
        return packet;
48
}
49

    
50
void destroyPacketContainer(PacketContainer* pktContainer){
51

    
52
        if (pktContainer != NULL){
53
                int i;
54
                for (i=0; i < pktContainer->iovlen; i++) free(pktContainer->iov[i].iov_base);
55
                free(pktContainer->iov);
56
                free(pktContainer->socketaddr);
57
                free(pktContainer);
58
        }
59
}
60

    
61
int addPacketTXqueue(PacketContainer *packet) {
62
        if ((TXqueue.size + packet->pktLen) > TXmaxSize) {
63
                fprintf(stderr,"[queueManagement::addPacketTXqueue] -- Sorry! Max size for TX queue. Packet will be discarded. \n");
64
                destroyPacketContainer(packet);
65
                return THROTTLE;
66
        }
67
        TXqueue.size += packet->pktLen;        
68

    
69
        if (TXqueue.head == NULL) {                        //adding first element
70
                TXqueue.head = packet;
71
                TXqueue.tail = packet;                
72
        } else {                                        //adding at the end of the queue
73
                TXqueue.tail->next = packet;
74
                TXqueue.tail = packet;
75
        }
76
        
77
        return OK;
78
}
79

    
80
PacketContainer* takePacketToSend() {                        //returns pointer to packet or NULL if queue is empty
81

    
82
        if (TXqueue.head != NULL) {
83
                PacketContainer *packet = TXqueue.head;
84
                
85
                TXqueue.size -= packet->pktLen;
86
                TXqueue.head = TXqueue.head->next;
87
                packet->next = NULL;
88
                if (TXqueue.head == NULL) TXqueue.tail = NULL;                                        
89
                return packet;
90
        }
91
        else return NULL;        
92
}
93

    
94
#ifdef RTX
95
void addPacketRTXqueue(PacketContainer *packet) {
96
        //removing old packets - because of maxTimeToHold
97
        struct timeval now, age;
98
        gettimeofday(&now, NULL);
99
 
100
        PacketContainer *tmp = RTXqueue.head;
101
        while (tmp != NULL) {
102
                age.tv_sec = now.tv_sec - tmp->timeStamp.tv_sec;
103
                age.tv_usec = now.tv_usec - tmp->timeStamp.tv_usec;
104

    
105
                if (age.tv_sec > maxTimeToHold.tv_sec) {
106

    
107
                        tmp = tmp->next;
108
                        removeOldestPacket();
109
                }
110
                else break;
111
        }
112

    
113
        while ((RTXqueue.size + packet->pktLen) > RTXmaxSize) {
114
                removeOldestPacket();
115
        }
116

    
117
        //adding timeStamp
118
        gettimeofday(&(packet->timeStamp), NULL);
119

    
120
        //finally - adding packet
121
        RTXqueue.size += packet->pktLen;
122
        packet->next = NULL;
123

    
124
        if (RTXqueue.head == NULL) {                        //adding first element
125
                RTXqueue.head = packet;
126
                RTXqueue.tail = packet;
127
                return;        
128
        } else {                                        //adding at the end of the queue
129
                RTXqueue.tail->next = packet;
130
                RTXqueue.tail = packet;
131
                return;
132
        }
133
}
134

    
135
int removeOldestPacket() {                        //return 0 if success, else (queue empty) return 1
136
        if (RTXqueue.head != NULL) {
137
                RTXqueue.size -= RTXqueue.head->pktLen;
138
                PacketContainer *pointer = RTXqueue.head;                
139
                RTXqueue.head = RTXqueue.head->next;
140
                destroyPacketContainer(pointer);
141
                return 0;
142
        }
143
        return 1;        
144
}
145
#endif
146

    
147
void setQueuesParams (int TXsize, int RTXsize, double maxTTHold) { //in bytes, bytes, sexonds
148
        TXmaxSize = TXsize;
149
        RTXmaxSize = RTXsize;
150
        maxTimeToHold.tv_sec = (int)floor(maxTTHold);
151
        maxTimeToHold.tv_usec = (int)(1000000.0 * fmod(maxTTHold, 1.0));
152
}
153

    
154
int isQueueEmpty() {
155
        
156
        if (TXqueue.head == NULL) return 1;
157
        return 0;
158
}
159

    
160
int getFirstPacketSize() {
161

    
162
        if (TXqueue.head != NULL) return TXqueue.head->pktLen;
163
        return 0;
164
}
165

    
166
#ifdef RTX
167
PacketContainer* searchPacketInRTX(int connID, int msgSeqNum, int offset) {
168
        
169
        connID = ntohl(connID);
170
        msgSeqNum = ntohl(msgSeqNum);
171
        offset = ntohl(offset);
172

    
173
        //fprintf(stderr,"***************Searching for packet... connID: %d msgSeqNum: %d offset: %d\n",connID,ntohl(msgSeqNum),ntohl(offset));
174

    
175
        PacketContainer *tmp = RTXqueue.head;
176
        PacketContainer *prev = NULL;
177

    
178
        while (tmp != NULL) {
179
                struct msg_header *msg_h;
180
                
181
                msg_h = (struct msg_header *) tmp->iov[0].iov_base;
182

    
183
                //fprintf(stderr,"^^^^^^^^^Searching^^^^^^^^^^^^ connID: %d msgSeqNum: %d offset: %d\n",connID,ntohl(msgSeqNum),offset);
184
                //fprintf(stderr,"^^^^^^^^^In queue^^^^^^^^^^^^ connID: %d msgSeqNum: %d offset: %d\n",msg_h->local_con_id,ntohl(msg_h->msg_seq_num),ntohl(msg_h->offset));
185

    
186
                if ((msg_h->local_con_id == connID) && (msg_h->msg_seq_num == msgSeqNum) && (msg_h->offset == offset)) {
187
                //fprintf(stderr,"*****************Packet found: ConID: %d  MsgseqNum: %d Offset: %d \n", ntohl(connID),ntohl(msgSeqNum),ntohl(offset));
188
                        if (tmp == RTXqueue.head) RTXqueue.head = tmp->next;
189
                        else if (tmp == RTXqueue.tail) { RTXqueue.tail = prev; prev->next == NULL; }
190
                        else prev->next = tmp->next;
191
                        RTXqueue.size -= tmp->pktLen;
192
                        return tmp;
193
                }
194
                prev = tmp;
195
                tmp = tmp->next;
196
        }
197

    
198
return NULL;
199
}
200

    
201
int rtxPacketsFromTo(int connID, int msgSeqNum, int offsetFrom, int offsetTo) {
202
        int offset = offsetFrom;
203
        //fprintf(stderr,"\t\t\t\t Retransmission request From: %d To: %d\n",offsetFrom, offsetTo);
204

    
205
        while (offset < offsetTo) {
206
                PacketContainer *packetToRTX = searchPacketInRTX(connID,msgSeqNum,offset);
207
                if (packetToRTX == NULL) return 1;
208

    
209
                //sending packet
210
                //fprintf(stderr,"\t\t\t\t\t Retransmitting packet: %d of msg_seq_num %d.\n",offset/1349,msgSeqNum);
211
                sendPacket(packetToRTX->udpSocket, packetToRTX->iov, 4, packetToRTX->socketaddr);
212
                sentRTXDataPktCounter++;
213
                offset += packetToRTX->iov[3].iov_len;
214
                destroyPacketContainer(packetToRTX);
215
        }
216
        return 0;
217
}
218
#endif