Statistics
| Branch: | Revision:

chunker-player / chunker_player / tcp_chunk_puller.c @ e11386c0

History | View | Annotate | Download (2.6 KB)

1
#include <stdlib.h>
2
#include <limits.h>
3
#include <stdarg.h>
4
#include <string.h>
5
#include <stdio.h>
6
#include <stdint.h>
7
#include <memory.h>
8
#include <sys/types.h>
9
#include <sys/time.h>
10
#include <sys/socket.h>
11
#include <netinet/in.h>
12
#include <unistd.h>
13

    
14
#define TCP_BUF_SIZE 65536
15

    
16
static int listen_port = 0;
17
static int accept_fd = -1;
18
static int socket_fd = -1;
19
static int isRunning = 0;
20
static int isReceving = 0;
21
static char listen_path[256];
22
static pthread_t AcceptThread;
23
static pthread_t RecvThread;
24

    
25
static void* RecvThreadProc(void* params);
26
static void* AcceptThreadProc(void* params);
27

    
28
int initChunkPuller(const int port)
29
{
30
        struct sockaddr_in servaddr;
31
        int r;
32
        int fd;
33
  
34
        accept_fd = socket(AF_INET, SOCK_STREAM, 0);
35
        if (fd < 0) {
36
                return -1;
37
        }
38
        servaddr.sin_family = AF_INET;
39
        servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
40
        servaddr.sin_port = htons(port);
41
        r = bind(accept_fd, (struct sockaddr *)&servaddr, sizeof(servaddr));
42
        
43
        printf("listening on port %$d\n", port);
44
        
45
        listen(accept_fd, 10);
46
        
47
        if(pthread_create( &AcceptThread, NULL, &AcceptThreadProc, NULL) != 0)
48
        {
49
                fprintf(stderr,"TCP-INPUT-MODULE: could not start accepting thread!!\n");
50
                return NULL;
51
        }
52
        
53
        return accept_fd;
54
}
55

    
56
static void* AcceptThreadProc(void* params)
57
{
58
        struct sockaddr_storage their_addr;
59
    socklen_t addr_size;
60
    int fd = -1;
61
    
62
    isRunning = 1;
63
    
64
    while(isRunning)
65
    {
66
                printf("trying to accept connection...\n");
67
                fd = accept(accept_fd, (struct sockaddr *)&their_addr, &addr_size);
68
                printf("connection requested!!!\n");
69
                if(socket_fd == -1)
70
                {
71
                        socket_fd = fd;
72
                        isReceving = 1;
73
                }
74
                else
75
                {
76
                        isReceving = 0;
77
                        pthread_join(RecvThread, NULL);
78
                        pthread_detach(RecvThread);
79
                        socket_fd = fd;
80
                }
81
                if(pthread_create( &RecvThread, NULL, &RecvThreadProc, NULL) != 0)
82
                {
83
                        fprintf(stderr,"TCP-INPUT-MODULE: could not start receveing thread!!\n");
84
                        return NULL;
85
                }
86
        }
87
        
88
        return NULL;
89
}
90

    
91
static void* RecvThreadProc(void* params)
92
{
93
        int ret = -1;
94
        int fragment_size = 0;
95
        uint8_t* buffer = (uint8_t*) malloc(TCP_BUF_SIZE);
96
        while(isReceving)
97
        {
98
                ret=recv(socket_fd, &fragment_size, sizeof(uint32_t), 0);
99
                if(ret <= sizeof(uint32_t))
100
                        continue;
101
                
102
                ret=recv(socket_fd, buffer, fragment_size, 0);
103
                while(ret <= fragment_size)
104
                        ret+=recv(socket_fd, buffer+ret, fragment_size-ret, 0);
105
                
106
                if(enqueueBlock(buffer, fragment_size))
107
                        fprintf(stderr, "TCP-INPUT-MODULE: could not enqueue a received chunk!! \n");
108
        }
109
        free(buffer);
110
        
111
        return NULL;
112
}
113

    
114
void finalizeChunkPuller()
115
{
116
        isRunning = 0;
117
        pthread_join(AcceptThread, NULL);
118
        pthread_detach(AcceptThread);
119
        
120
        if(socket_fd > 0)
121
                close(socket_fd);
122

    
123
        close(accept_fd);
124
}