Statistics
| Branch: | Revision:

chunker-player / chunker_player / tcp_chunk_puller.c @ b4797b9a

History | View | Annotate | Download (4.1 KB)

1
/*
2
 *  Copyright (c) 2009-2011 Carmelo Daniele, Dario Marchese, Diego Reforgiato, Giuseppe Tropea
3
 *  developed for the Napa-Wine EU project. See www.napa-wine.eu
4
 *
5
 *  This is free software; see lgpl-2.1.txt
6
 */
7

    
8
#include <stdlib.h>
9
#include <limits.h>
10
#include <stdarg.h>
11
#include <string.h>
12
#include <stdio.h>
13
#include <stdint.h>
14
#include <memory.h>
15
#include <sys/types.h>
16
#include <sys/time.h>
17
#ifndef _WIN32
18
#include <sys/socket.h>
19
#include <netinet/in.h>
20
#else
21
#include <winsock2.h>
22
#include <ws2tcpip.h>
23
#endif
24
#include <unistd.h>
25
#include <pthread.h>
26

    
27
//handle threads through SDL
28
#include <SDL.h>
29
#include <SDL_thread.h>
30

    
31
#define TCP_BUF_SIZE 65536*16
32

    
33
static int accept_fd = -1;
34
static int socket_fd = -1;
35
static int isRunning = 0;
36
static int isReceving = 0;
37
static SDL_Thread *AcceptThread;
38
static SDL_Thread *RecvThread;
39

    
40
static int RecvThreadProc(void* params);
41
static int AcceptThreadProc(void* params);
42

    
43
int initChunkPuller(const int port)
44
{
45
        struct sockaddr_in servaddr;
46
        int r;
47

    
48
#ifdef _WIN32
49
        {
50
                WORD wVersionRequested;
51
                WSADATA wsaData;
52
                int err;
53

    
54
                wVersionRequested = MAKEWORD(2, 2);
55
                err = WSAStartup(wVersionRequested, &wsaData);
56
                if (err != 0) {
57
                        fprintf(stderr, "WSAStartup failed with error: %d\n", err);
58
                        return -1;
59
                }
60
        }
61
#endif
62
  
63
        accept_fd = socket(AF_INET, SOCK_STREAM, 0);
64
        if (accept_fd < 0) {
65
                perror("cannot create socket!\n");
66
                return -1;
67
        }
68
        servaddr.sin_family = AF_INET;
69
        servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
70
        servaddr.sin_port = htons(port);
71
        r = bind(accept_fd, (struct sockaddr *)&servaddr, sizeof(servaddr));
72
        
73
        fprintf(stderr,"listening on port %d\n", port);
74
        
75
        if((AcceptThread = SDL_CreateThread(&AcceptThreadProc, NULL)) == 0)
76
        {
77
                fprintf(stderr,"TCP-INPUT-MODULE: could not start accepting thread!!\n");
78
                return -1;
79
        }
80
        
81
        return accept_fd;
82
}
83

    
84
static int AcceptThreadProc(void* params)
85
{
86
    int fd = -1;
87
    
88
    isRunning = 1;
89

    
90
    listen(accept_fd, 10);
91
    
92
    while(isRunning)
93
    {
94
                printf("TCP-INPUT-MODULE: waiting for connection...\n");
95
                fd = accept(accept_fd, NULL, NULL);
96
                if (fd < 0) {
97
                        perror("TCP-INPUT-MODULE: accept error");
98
                        continue;
99
                }
100
                printf("TCP-INPUT-MODULE: accept: fd =%d\n", fd);
101
                if(socket_fd == -1)
102
                {
103
                        socket_fd = fd;
104
                        isReceving = 1;
105
                }
106
                else
107
                {
108
                        isReceving = 0;
109
                        printf("TCP-INPUT-MODULE: waiting for receive thread to terminate...\n");
110
                        SDL_WaitThread(RecvThread, NULL);
111
                        printf("TCP-INPUT-MODULE: receive thread terminated\n");
112
                        socket_fd = fd;
113
                }
114
                if((RecvThread = SDL_CreateThread(&RecvThreadProc, NULL)) == 0)
115
                {
116
                        fprintf(stderr,"TCP-INPUT-MODULE: could not start receveing thread!!\n");
117
                        return NULL;
118
                }
119
        }
120
        
121
        return NULL;
122
}
123

    
124
static int RecvThreadProc(void* params)
125
{
126
        int ret = -1;
127
        uint32_t fragment_size = 0;
128
        uint8_t* buffer = (uint8_t*) malloc(TCP_BUF_SIZE);
129

    
130
        fprintf(stderr,"TCP-INPUT-MODULE: receive thread created\n");
131

    
132
        while(isReceving) {
133
                int b;
134

    
135
                ret=recv(socket_fd, &fragment_size, sizeof(uint32_t), 0);
136
                fragment_size = ntohl(fragment_size);
137

    
138
                if(ret < 0) {
139
                        perror("TCP-INPUT-MODULE: recv error:");
140
                        break;
141
                } else if (ret == 0) {
142
                        fprintf(stderr, "TCP-INPUT-MODULE: connection closed\n");
143
                }
144
                fprintf(stderr, "TCP-INPUT-MODULE: received %d bytes. Fragment size: %u\n", ret, fragment_size);
145

    
146
                if (fragment_size > TCP_BUF_SIZE) {
147
                        fprintf(stderr, "TCP-INPUT-MODULE: buffer too small or some corruption, closing connection ... "); //TODO, handle this better
148
                        break;
149
                } else if (fragment_size == 0) {        //strange, but valid
150
                        continue;
151
                }
152

    
153
                b = 0;
154
                while(b < fragment_size) {
155
                        ret = recv(socket_fd, buffer + b, fragment_size - b, 0);
156
                        if (ret <= 0) {
157
                                break;
158
                        }
159
                        b += ret;
160
                }
161
                if (ret <= 0) {
162
                        fprintf(stderr, "TCP-INPUT-MODULE: error or close during chunk receive, closing connection ...");
163
                }
164
                fprintf(stderr, "TCP-INPUT-MODULE: received %d bytes.\n", ret);
165
                
166
                if(enqueueBlock(buffer, fragment_size))
167
                        fprintf(stderr, "TCP-INPUT-MODULE: could not enqueue a received chunk!! \n");
168
        }
169
        free(buffer);
170
        close(socket_fd);
171
        socket_fd = -1;
172

    
173
        return NULL;
174
}
175

    
176
void finalizeChunkPuller()
177
{
178
        isRunning = 0;
179
        SDL_WaitThread(AcceptThread, NULL);
180
        
181
        if(socket_fd > 0)
182
                close(socket_fd);
183

    
184
        close(accept_fd);
185
}