Statistics
| Branch: | Revision:

chunker-player / chunker_player / tcp_chunk_puller.c @ c30f0667

History | View | Annotate | Download (3.58 KB)

1
/*
2
 *  Copyright (c) 2009-2011 Carmelo Daniele, Dario Marchese, Diego Reforgiato, Giuseppe Tropea
3
 *  developed for the Napa-Wine EU project. See www.napa-wine.eu
4
 *
5
 *  This is free software; see lgpl-2.1.txt
6
 */
7

    
8
#include <stdlib.h>
9
#include <limits.h>
10
#include <stdarg.h>
11
#include <string.h>
12
#include <stdio.h>
13
#include <stdint.h>
14
#include <memory.h>
15
#include <sys/types.h>
16
#include <sys/time.h>
17
#ifndef _WIN32
18
#include <sys/socket.h>
19
#include <netinet/in.h>
20
#else
21
#include <winsock2.h>
22
#include <ws2tcpip.h>
23
#endif
24
#include <unistd.h>
25
#include <pthread.h>
26

    
27
#define TCP_BUF_SIZE 65536*16
28

    
29
static int accept_fd = -1;
30
static int socket_fd = -1;
31
static int isRunning = 0;
32
static int isReceving = 0;
33
static pthread_t AcceptThread;
34
static pthread_t RecvThread;
35

    
36
static void* RecvThreadProc(void* params);
37
static void* AcceptThreadProc(void* params);
38

    
39
int initChunkPuller(const int port)
40
{
41
        struct sockaddr_in servaddr;
42
        int r;
43

    
44
#ifdef _WIN32
45
        {
46
                WORD wVersionRequested;
47
                WSADATA wsaData;
48
                int err;
49

    
50
                wVersionRequested = MAKEWORD(2, 2);
51
                err = WSAStartup(wVersionRequested, &wsaData);
52
                if (err != 0) {
53
                        fprintf(stderr, "WSAStartup failed with error: %d\n", err);
54
                        return -1;
55
                }
56
        }
57
#endif
58
  
59
        accept_fd = socket(AF_INET, SOCK_STREAM, 0);
60
        if (accept_fd < 0) {
61
                perror("cannot create socket!\n");
62
                return -1;
63
        }
64
        servaddr.sin_family = AF_INET;
65
        servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
66
        servaddr.sin_port = htons(port);
67
        r = bind(accept_fd, (struct sockaddr *)&servaddr, sizeof(servaddr));
68
        
69
        fprintf(stderr,"listening on port %d\n", port);
70
        
71
        if(pthread_create( &AcceptThread, NULL, &AcceptThreadProc, NULL) != 0)
72
        {
73
                fprintf(stderr,"TCP-INPUT-MODULE: could not start accepting thread!!\n");
74
                return -1;
75
        }
76
        
77
        return accept_fd;
78
}
79

    
80
static void* AcceptThreadProc(void* params)
81
{
82
    int fd = -1;
83
    
84
    isRunning = 1;
85

    
86
    listen(accept_fd, 10);
87
    
88
    while(isRunning)
89
    {
90
                printf("TCP-INPUT-MODULE: waiting for connection...\n");
91
                fd = accept(accept_fd, NULL, NULL);
92
                if (fd < 0) {
93
                        perror("TCP-INPUT-MODULE: accept error");
94
                        continue;
95
                }
96
                printf("TCP-INPUT-MODULE: accept: fd =%d\n", fd);
97
                if(socket_fd == -1)
98
                {
99
                        socket_fd = fd;
100
                        isReceving = 1;
101
                }
102
                else
103
                {
104
                        isReceving = 0;
105
                        printf("TCP-INPUT-MODULE: waiting for receive thread to terminate...\n");
106
                        pthread_join(RecvThread, NULL);
107
                        pthread_detach(RecvThread);
108
                        printf("TCP-INPUT-MODULE: receive thread terminated\n");
109
                        socket_fd = fd;
110
                }
111
                if(pthread_create( &RecvThread, NULL, &RecvThreadProc, NULL) != 0)
112
                {
113
                        fprintf(stderr,"TCP-INPUT-MODULE: could not start receveing thread!!\n");
114
                        return NULL;
115
                }
116
        }
117
        
118
        return NULL;
119
}
120

    
121
static void* RecvThreadProc(void* params)
122
{
123
        int ret = -1;
124
        uint32_t fragment_size = 0;
125
        uint8_t* buffer = (uint8_t*) malloc(TCP_BUF_SIZE);
126

    
127
        fprintf(stderr,"TCP-INPUT-MODULE: receive thread created\n");
128

    
129
        while(isReceving) {
130
                ret=recv(socket_fd, &fragment_size, sizeof(uint32_t), 0);
131
                fragment_size = ntohl(fragment_size);
132

    
133
fprintf(stderr, "TCP-INPUT-MODULE: received %d bytes. Fragment size: %d\n", ret, fragment_size);
134
                if(ret <= 0) {
135
                        break;
136
                }
137

    
138
                ret=recv(socket_fd, buffer, fragment_size, 0);
139
fprintf(stderr, "TCP-INPUT-MODULE: received %d bytes.\n", ret);
140
                while(ret < fragment_size)
141
                        ret+=recv(socket_fd, buffer+ret, fragment_size-ret, 0);
142
                
143
                if(enqueueBlock(buffer, fragment_size))
144
                        fprintf(stderr, "TCP-INPUT-MODULE: could not enqueue a received chunk!! \n");
145
        }
146
        free(buffer);
147
        close(socket_fd);
148
        socket_fd = -1;
149

    
150
        return NULL;
151
}
152

    
153
void finalizeChunkPuller()
154
{
155
        isRunning = 0;
156
        pthread_join(AcceptThread, NULL);
157
        pthread_detach(AcceptThread);
158
        
159
        if(socket_fd > 0)
160
                close(socket_fd);
161

    
162
        close(accept_fd);
163
}