Statistics
| Branch: | Revision:

chunker-player / chunker_player / tcp_chunk_puller.c @ 15edce59

History | View | Annotate | Download (3.1 KB)

1
/*
2
 *  Copyright (c) 2009-2011 Carmelo Daniele, Dario Marchese, Diego Reforgiato, Giuseppe Tropea
3
 *  developed for the Napa-Wine EU project. See www.napa-wine.eu
4
 *
5
 *  This is free software; see lgpl-2.1.txt
6
 */
7

    
8
#include <stdlib.h>
9
#include <limits.h>
10
#include <stdarg.h>
11
#include <string.h>
12
#include <stdio.h>
13
#include <stdint.h>
14
#include <memory.h>
15
#include <sys/types.h>
16
#include <sys/time.h>
17
#include <sys/socket.h>
18
#include <netinet/in.h>
19
#include <unistd.h>
20

    
21
#define TCP_BUF_SIZE 65536*16
22

    
23
static int listen_port = 0;
24
static int accept_fd = -1;
25
static int socket_fd = -1;
26
static int isRunning = 0;
27
static int isReceving = 0;
28
static char listen_path[256];
29
static pthread_t AcceptThread;
30
static pthread_t RecvThread;
31

    
32
static void* RecvThreadProc(void* params);
33
static void* AcceptThreadProc(void* params);
34

    
35
int initChunkPuller(const int port)
36
{
37
        struct sockaddr_in servaddr;
38
        int r;
39
        int fd;
40
  
41
        accept_fd = socket(AF_INET, SOCK_STREAM, 0);
42
        if (fd < 0) {
43
                return -1;
44
        }
45
        servaddr.sin_family = AF_INET;
46
        servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
47
        servaddr.sin_port = htons(port);
48
        r = bind(accept_fd, (struct sockaddr *)&servaddr, sizeof(servaddr));
49
        
50
        printf("listening on port %$d\n", port);
51
        
52
        if(pthread_create( &AcceptThread, NULL, &AcceptThreadProc, NULL) != 0)
53
        {
54
                fprintf(stderr,"TCP-INPUT-MODULE: could not start accepting thread!!\n");
55
                return NULL;
56
        }
57
        
58
        return accept_fd;
59
}
60

    
61
static void* AcceptThreadProc(void* params)
62
{
63
        struct sockaddr_storage their_addr;
64
    socklen_t addr_size;
65
    int fd = -1;
66
    
67
    isRunning = 1;
68

    
69
    listen(accept_fd, 10);
70
    
71
    while(isRunning)
72
    {
73
                printf("trying to accept connection...\n");
74
                fd = accept(accept_fd, (struct sockaddr *)&their_addr, &addr_size);
75
                printf("connection requested!!!\n");
76
                if(socket_fd == -1)
77
                {
78
                        socket_fd = fd;
79
                        isReceving = 1;
80
                }
81
                else
82
                {
83
                        isReceving = 0;
84
                        pthread_join(RecvThread, NULL);
85
                        pthread_detach(RecvThread);
86
                        socket_fd = fd;
87
                }
88
                if(pthread_create( &RecvThread, NULL, &RecvThreadProc, NULL) != 0)
89
                {
90
                        fprintf(stderr,"TCP-INPUT-MODULE: could not start receveing thread!!\n");
91
                        return NULL;
92
                }
93
        }
94
        
95
        return NULL;
96
}
97

    
98
static void* RecvThreadProc(void* params)
99
{
100
        int ret = -1;
101
        uint32_t fragment_size = 0;
102
        uint8_t* buffer = (uint8_t*) malloc(TCP_BUF_SIZE);
103

    
104
        fprintf(stderr,"TCP-INPUT-MODULE: receive thread created\n");
105

    
106
        while(isReceving) {
107
                ret=recv(socket_fd, &fragment_size, sizeof(uint32_t), 0);
108
                fragment_size = ntohl(fragment_size);
109

    
110
fprintf(stderr, "TCP-INPUT-MODULE: received %d bytes. Fragment size: %d\n", ret, fragment_size);
111
                if(ret <= 0) {
112
                        break;
113
                }
114

    
115
                ret=recv(socket_fd, buffer, fragment_size, 0);
116
fprintf(stderr, "TCP-INPUT-MODULE: received %d bytes.\n", ret);
117
                while(ret < fragment_size)
118
                        ret+=recv(socket_fd, buffer+ret, fragment_size-ret, 0);
119
                
120
                if(enqueueBlock(buffer, fragment_size))
121
                        fprintf(stderr, "TCP-INPUT-MODULE: could not enqueue a received chunk!! \n");
122
        }
123
        free(buffer);
124
        close(socket_fd);
125
        socket_fd = -1;
126

    
127
        return NULL;
128
}
129

    
130
void finalizeChunkPuller()
131
{
132
        isRunning = 0;
133
        pthread_join(AcceptThread, NULL);
134
        pthread_detach(AcceptThread);
135
        
136
        if(socket_fd > 0)
137
                close(socket_fd);
138

    
139
        close(accept_fd);
140
}