Statistics
| Branch: | Revision:

chunker-player / chunker_player / tcp_chunk_puller.c @ 56920663

History | View | Annotate | Download (3.45 KB)

1
/*
2
 *  Copyright (c) 2009-2011 Carmelo Daniele, Dario Marchese, Diego Reforgiato, Giuseppe Tropea
3
 *  developed for the Napa-Wine EU project. See www.napa-wine.eu
4
 *
5
 *  This is free software; see lgpl-2.1.txt
6
 */
7

    
8
#include <stdlib.h>
9
#include <limits.h>
10
#include <stdarg.h>
11
#include <string.h>
12
#include <stdio.h>
13
#include <stdint.h>
14
#include <memory.h>
15
#include <sys/types.h>
16
#include <sys/time.h>
17
#ifndef _WIN32
18
#include <sys/socket.h>
19
#include <netinet/in.h>
20
#else
21
#include <winsock2.h>
22
#include <ws2tcpip.h>
23
#endif
24
#include <unistd.h>
25
#include <pthread.h>
26

    
27
#define TCP_BUF_SIZE 65536*16
28

    
29
static int accept_fd = -1;
30
static int socket_fd = -1;
31
static int isRunning = 0;
32
static int isReceving = 0;
33
static pthread_t AcceptThread;
34
static pthread_t RecvThread;
35

    
36
static void* RecvThreadProc(void* params);
37
static void* AcceptThreadProc(void* params);
38

    
39
int initChunkPuller(const int port)
40
{
41
        struct sockaddr_in servaddr;
42
        int r;
43
        int fd;
44

    
45
#ifdef _WIN32
46
        {
47
                WORD wVersionRequested;
48
                WSADATA wsaData;
49
                int err;
50

    
51
                wVersionRequested = MAKEWORD(2, 2);
52
                err = WSAStartup(wVersionRequested, &wsaData);
53
                if (err != 0) {
54
                        fprintf(stderr, "WSAStartup failed with error: %d\n", err);
55
                        return NULL;
56
                }
57
        }
58
#endif
59
  
60
        accept_fd = socket(AF_INET, SOCK_STREAM, 0);
61
        if (accept_fd < 0) {
62
                perror("cannot create socket!\n");
63
                return -1;
64
        }
65
        servaddr.sin_family = AF_INET;
66
        servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
67
        servaddr.sin_port = htons(port);
68
        r = bind(accept_fd, (struct sockaddr *)&servaddr, sizeof(servaddr));
69
        
70
        fprintf(stderr,"listening on port %d\n", port);
71
        
72
        if(pthread_create( &AcceptThread, NULL, &AcceptThreadProc, NULL) != 0)
73
        {
74
                fprintf(stderr,"TCP-INPUT-MODULE: could not start accepting thread!!\n");
75
                return -1;
76
        }
77
        
78
        return accept_fd;
79
}
80

    
81
static void* AcceptThreadProc(void* params)
82
{
83
        struct sockaddr_storage their_addr;
84
    socklen_t addr_size;
85
    int fd = -1;
86
    
87
    isRunning = 1;
88

    
89
    listen(accept_fd, 10);
90
    
91
    while(isRunning)
92
    {
93
                printf("trying to accept connection...\n");
94
                fd = accept(accept_fd, (struct sockaddr *)&their_addr, &addr_size);
95
                printf("connection requested!!!\n");
96
                if(socket_fd == -1)
97
                {
98
                        socket_fd = fd;
99
                        isReceving = 1;
100
                }
101
                else
102
                {
103
                        isReceving = 0;
104
                        pthread_join(RecvThread, NULL);
105
                        pthread_detach(RecvThread);
106
                        socket_fd = fd;
107
                }
108
                if(pthread_create( &RecvThread, NULL, &RecvThreadProc, NULL) != 0)
109
                {
110
                        fprintf(stderr,"TCP-INPUT-MODULE: could not start receveing thread!!\n");
111
                        return NULL;
112
                }
113
        }
114
        
115
        return NULL;
116
}
117

    
118
static void* RecvThreadProc(void* params)
119
{
120
        int ret = -1;
121
        uint32_t fragment_size = 0;
122
        uint8_t* buffer = (uint8_t*) malloc(TCP_BUF_SIZE);
123

    
124
        fprintf(stderr,"TCP-INPUT-MODULE: receive thread created\n");
125

    
126
        while(isReceving) {
127
                ret=recv(socket_fd, &fragment_size, sizeof(uint32_t), 0);
128
                fragment_size = ntohl(fragment_size);
129

    
130
fprintf(stderr, "TCP-INPUT-MODULE: received %d bytes. Fragment size: %d\n", ret, fragment_size);
131
                if(ret <= 0) {
132
                        break;
133
                }
134

    
135
                ret=recv(socket_fd, buffer, fragment_size, 0);
136
fprintf(stderr, "TCP-INPUT-MODULE: received %d bytes.\n", ret);
137
                while(ret < fragment_size)
138
                        ret+=recv(socket_fd, buffer+ret, fragment_size-ret, 0);
139
                
140
                if(enqueueBlock(buffer, fragment_size))
141
                        fprintf(stderr, "TCP-INPUT-MODULE: could not enqueue a received chunk!! \n");
142
        }
143
        free(buffer);
144
        close(socket_fd);
145
        socket_fd = -1;
146

    
147
        return NULL;
148
}
149

    
150
void finalizeChunkPuller()
151
{
152
        isRunning = 0;
153
        pthread_join(AcceptThread, NULL);
154
        pthread_detach(AcceptThread);
155
        
156
        if(socket_fd > 0)
157
                close(socket_fd);
158

    
159
        close(accept_fd);
160
}