Statistics
| Branch: | Revision:

chunker-player / chunker_player / tcp_chunk_puller.c @ 90702d45

History | View | Annotate | Download (3.34 KB)

1
/*
2
 *  Copyright (c) 2009-2011 Carmelo Daniele, Dario Marchese, Diego Reforgiato, Giuseppe Tropea
3
 *  developed for the Napa-Wine EU project. See www.napa-wine.eu
4
 *
5
 *  This is free software; see lgpl-2.1.txt
6
 */
7

    
8
#include <stdlib.h>
9
#include <limits.h>
10
#include <stdarg.h>
11
#include <string.h>
12
#include <stdio.h>
13
#include <stdint.h>
14
#include <memory.h>
15
#include <sys/types.h>
16
#include <sys/time.h>
17
#ifndef _WIN32
18
#include <sys/socket.h>
19
#include <netinet/in.h>
20
#else
21
#include <winsock2.h>
22
#include <ws2tcpip.h>
23
#endif
24
#include <unistd.h>
25
#include <pthread.h>
26

    
27
#define TCP_BUF_SIZE 65536*16
28

    
29
static int accept_fd = -1;
30
static int socket_fd = -1;
31
static int isRunning = 0;
32
static int isReceving = 0;
33
static pthread_t AcceptThread;
34
static pthread_t RecvThread;
35

    
36
static void* RecvThreadProc(void* params);
37
static void* AcceptThreadProc(void* params);
38

    
39
int initChunkPuller(const int port)
40
{
41
        struct sockaddr_in servaddr;
42
        int r;
43

    
44
#ifdef _WIN32
45
        {
46
                WORD wVersionRequested;
47
                WSADATA wsaData;
48
                int err;
49

    
50
                wVersionRequested = MAKEWORD(2, 2);
51
                err = WSAStartup(wVersionRequested, &wsaData);
52
                if (err != 0) {
53
                        fprintf(stderr, "WSAStartup failed with error: %d\n", err);
54
                        return -1;
55
                }
56
        }
57
#endif
58
  
59
        accept_fd = socket(AF_INET, SOCK_STREAM, 0);
60
        if (accept_fd < 0) {
61
                perror("cannot create socket!\n");
62
                return -1;
63
        }
64
        servaddr.sin_family = AF_INET;
65
        servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
66
        servaddr.sin_port = htons(port);
67
        r = bind(accept_fd, (struct sockaddr *)&servaddr, sizeof(servaddr));
68
        
69
        fprintf(stderr,"listening on port %d\n", port);
70
        
71
        if(pthread_create( &AcceptThread, NULL, &AcceptThreadProc, NULL) != 0)
72
        {
73
                fprintf(stderr,"TCP-INPUT-MODULE: could not start accepting thread!!\n");
74
                return -1;
75
        }
76
        
77
        return accept_fd;
78
}
79

    
80
static void* AcceptThreadProc(void* params)
81
{
82
    int fd = -1;
83
    
84
    isRunning = 1;
85

    
86
    listen(accept_fd, 10);
87
    
88
    while(isRunning)
89
    {
90
                printf("trying to accept connection...\n");
91
                fd = accept(accept_fd, NULL, NULL);
92
                printf("connection requested!!!\n");
93
                if(socket_fd == -1)
94
                {
95
                        socket_fd = fd;
96
                        isReceving = 1;
97
                }
98
                else
99
                {
100
                        isReceving = 0;
101
                        pthread_join(RecvThread, NULL);
102
                        pthread_detach(RecvThread);
103
                        socket_fd = fd;
104
                }
105
                if(pthread_create( &RecvThread, NULL, &RecvThreadProc, NULL) != 0)
106
                {
107
                        fprintf(stderr,"TCP-INPUT-MODULE: could not start receveing thread!!\n");
108
                        return NULL;
109
                }
110
        }
111
        
112
        return NULL;
113
}
114

    
115
static void* RecvThreadProc(void* params)
116
{
117
        int ret = -1;
118
        uint32_t fragment_size = 0;
119
        uint8_t* buffer = (uint8_t*) malloc(TCP_BUF_SIZE);
120

    
121
        fprintf(stderr,"TCP-INPUT-MODULE: receive thread created\n");
122

    
123
        while(isReceving) {
124
                ret=recv(socket_fd, &fragment_size, sizeof(uint32_t), 0);
125
                fragment_size = ntohl(fragment_size);
126

    
127
fprintf(stderr, "TCP-INPUT-MODULE: received %d bytes. Fragment size: %d\n", ret, fragment_size);
128
                if(ret <= 0) {
129
                        break;
130
                }
131

    
132
                ret=recv(socket_fd, buffer, fragment_size, 0);
133
fprintf(stderr, "TCP-INPUT-MODULE: received %d bytes.\n", ret);
134
                while(ret < fragment_size)
135
                        ret+=recv(socket_fd, buffer+ret, fragment_size-ret, 0);
136
                
137
                if(enqueueBlock(buffer, fragment_size))
138
                        fprintf(stderr, "TCP-INPUT-MODULE: could not enqueue a received chunk!! \n");
139
        }
140
        free(buffer);
141
        close(socket_fd);
142
        socket_fd = -1;
143

    
144
        return NULL;
145
}
146

    
147
void finalizeChunkPuller()
148
{
149
        isRunning = 0;
150
        pthread_join(AcceptThread, NULL);
151
        pthread_detach(AcceptThread);
152
        
153
        if(socket_fd > 0)
154
                close(socket_fd);
155

    
156
        close(accept_fd);
157
}