Statistics
| Branch: | Revision:

napa-baselibs / ml / util / queueManagement.c @ 612e856d

History | View | Annotate | Download (4.51 KB)

1
/*
2
 *          
3
 *
4
 *
5
 * 
6
 *        Agnieszka Witczak & Szymon Kuc
7
 *     
8
 */
9

    
10

    
11
#include <stdio.h>
12
#include <stdlib.h>
13
#include <math.h>
14

    
15
#include <unistd.h>
16
#include <string.h>
17
#include <sys/uio.h>
18
#include <netinet/in.h>
19

    
20
#include "queueManagement.h"
21
#include "udpSocket.h"
22

    
23
        FILE *file = NULL;
24

    
25
PacketQueue TXqueue;
26
PacketQueue RTXqueue;
27

    
28
int TXmaxSize = 6000*1500;                        //bytes
29
int RTXmaxSize = 6000*1500;                        //bytes
30

    
31
struct timeval maxTimeToHold = {5,0};
32

    
33
PacketContainer* createPacketContainer(const int uSoc,struct iovec *ioVector,int iovlen,struct sockaddr_in *sockAddress, unsigned char prior) {
34
        
35
        PacketContainer *packet = malloc(sizeof(PacketContainer));
36
        packet->udpSocket = uSoc;
37
        packet->iovlen = iovlen;
38
        packet->next = NULL;
39
        packet->pktLen = ioVector[0].iov_len + ioVector[1].iov_len + ioVector[2].iov_len + ioVector[3].iov_len;
40

    
41
        packet->priority = prior;
42

    
43
         int i;
44
        packet->iov = malloc(sizeof(struct iovec) * iovlen);
45
        for (i=0; i<iovlen; i++){
46
                packet->iov[i].iov_len = ioVector[i].iov_len;
47
                packet->iov[i].iov_base = malloc(packet->iov[i].iov_len);
48
                memcpy(packet->iov[i].iov_base, ioVector[i].iov_base, packet->iov[i].iov_len);
49
        }
50

    
51
        packet->socketaddr = malloc(sizeof(struct sockaddr_in));
52
        memcpy(packet->socketaddr, sockAddress, sizeof(struct sockaddr_in));
53

    
54
        int packet_len = ioVector[0].iov_len + ioVector[1].iov_len + ioVector[2].iov_len + ioVector[3].iov_len;
55
        return packet;
56
}
57

    
58
void destroyPacketContainer(PacketContainer* pktContainer){
59

    
60
        int i;
61
        for (i=0; i < pktContainer->iovlen; i++) free(pktContainer->iov[i].iov_base);
62
        free(pktContainer->iov);
63
        free(pktContainer->socketaddr);
64
        free(pktContainer);
65
}
66

    
67
int addPacketTXqueue(PacketContainer *packet) {
68
        if ((TXqueue.size + packet->pktLen) > TXmaxSize) {
69
                fprintf(stderr,"[queueManagement::addPacketTXqueue] -- Sorry! Max size for TX queue. Packet will be discarded. \n");
70
                destroyPacketContainer(packet);
71
                return THROTTLE;
72
        }
73
        TXqueue.size += packet->pktLen;        
74

    
75
        if (TXqueue.head == NULL) {                        //adding first element
76
                TXqueue.head = packet;
77
                TXqueue.tail = packet;                
78
        } else {                                        //adding at the end of the queue
79
                TXqueue.tail->next = packet;
80
                TXqueue.tail = packet;
81
        }
82
        
83
        //logQueueOccupancy();
84
        return OK;
85
}
86

    
87
PacketContainer* takePacketToSend() {                        //returns pointer to packet or NULL if queue is empty
88
        if (TXqueue.head != NULL) {
89
                PacketContainer *packet = TXqueue.head;
90
                
91
                TXqueue.size -= packet->pktLen;
92
                TXqueue.head = TXqueue.head->next;
93
                packet->next = NULL;
94
                                        
95
                return packet;
96
        }
97
        else return NULL;        
98
}
99

    
100

    
101
void addPacketRTXqueue(PacketContainer *packet) {
102
        //removing old packets - because of maxTimeToHold
103
        struct timeval now, age;
104
        gettimeofday(&now, NULL);
105
 
106
        PacketContainer *tmp = RTXqueue.head;
107
        while (tmp != NULL) {
108
                age.tv_sec = now.tv_sec - tmp->timeStamp.tv_sec;
109
                age.tv_usec = now.tv_usec - tmp->timeStamp.tv_usec;
110

    
111
                if (age.tv_sec > maxTimeToHold.tv_sec) {
112

    
113
                        tmp = tmp->next;
114
                        removeOldestPacket();
115
                }
116
                else break;
117
        }
118

    
119

    
120
        if ((RTXqueue.size + packet->pktLen) > RTXmaxSize) {
121
                removeOldestPacket();
122
        }
123

    
124
        //adding timeStamp
125
        gettimeofday(&(packet->timeStamp), NULL);
126

    
127
        //finally - adding packet
128
        RTXqueue.size += packet->pktLen;
129

    
130
        if (RTXqueue.head == NULL) {                        //adding first element
131
                RTXqueue.head = packet;
132
                RTXqueue.tail = packet;
133
                return;        
134
        } else {                                        //adding at the end of the queue
135
                RTXqueue.tail->next = packet;
136
                RTXqueue.tail = packet;
137
                return;
138
        }
139
}
140

    
141
int removeOldestPacket() {                        //return 0 if success, else (queue empty) return 1
142
        if (RTXqueue.head != NULL) {
143
                RTXqueue.size -= RTXqueue.head->pktLen;
144
                PacketContainer *pointer = RTXqueue.head;                
145
                RTXqueue.head = RTXqueue.head->next;
146
                destroyPacketContainer(pointer);
147
                return 0;
148
        }
149
        return 1;        
150
}
151

    
152
void setQueuesParams (int TXsize, int RTXsize, double maxTTHold) {
153
        TXmaxSize = TXsize;
154
        RTXmaxSize = RTXsize;
155
        maxTimeToHold.tv_sec = (int)floor(maxTTHold);
156
        maxTimeToHold.tv_usec = (int)(1000000.0 * fmod(maxTTHold, 1.0));
157
}
158

    
159
int isQueueEmpty() {
160
        
161
        if (TXqueue.head == NULL) return 1;
162
        return 0;
163
}
164

    
165
int getFirstPacketSize() {
166

    
167
        if (TXqueue.head != NULL) return TXqueue.head->pktLen;
168
        return 0;
169
}
170

    
171
void logQueueOccupancy() {
172
        struct timeval now;
173
        gettimeofday(&now, NULL);
174
        double time = now.tv_sec + ((double) now.tv_usec)/1000000;
175
        file = fopen("./queue.txt", "a+");
176
        //char *str;
177
        //sprintf(str,"%f",time); 
178
        //fputs(str, p);
179
        fprintf(file,"%f \t %d \n",time, TXqueue.size);        
180
        fprintf(stderr,"[queueManagement::logQueueOccupancy] -- Time: %f \t queueOccupancy: %d \n", time, TXqueue.size);
181
        fclose(file);
182
}