Statistics
| Branch: | Revision:

chunker-player / chunker_player / tcp_chunk_puller.c @ 9f4cebf3

History | View | Annotate | Download (3.14 KB)

1
/*
2
 *  Copyright (c) 2009-2011 Carmelo Daniele, Dario Marchese, Diego Reforgiato, Giuseppe Tropea
3
 *  developed for the Napa-Wine EU project. See www.napa-wine.eu
4
 *
5
 *  This is free software; see lgpl-2.1.txt
6
 */
7

    
8
#include <stdlib.h>
9
#include <limits.h>
10
#include <stdarg.h>
11
#include <string.h>
12
#include <stdio.h>
13
#include <stdint.h>
14
#include <memory.h>
15
#include <sys/types.h>
16
#include <sys/time.h>
17
#include <sys/socket.h>
18
#include <netinet/in.h>
19
#include <unistd.h>
20

    
21
#define TCP_BUF_SIZE 65536*16
22

    
23
static int listen_port = 0;
24
static int accept_fd = -1;
25
static int socket_fd = -1;
26
static int isRunning = 0;
27
static int isReceving = 0;
28
static char listen_path[256];
29
static pthread_t AcceptThread;
30
static pthread_t RecvThread;
31

    
32
static void* RecvThreadProc(void* params);
33
static void* AcceptThreadProc(void* params);
34

    
35
int initChunkPuller(const int port)
36
{
37
        struct sockaddr_in servaddr;
38
        int r;
39
        int fd;
40
  
41
        accept_fd = socket(AF_INET, SOCK_STREAM, 0);
42
        if (accept_fd < 0) {
43
                perror("cannot create socket!\n");
44
                return -1;
45
        }
46
        servaddr.sin_family = AF_INET;
47
        servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
48
        servaddr.sin_port = htons(port);
49
        r = bind(accept_fd, (struct sockaddr *)&servaddr, sizeof(servaddr));
50
        
51
        printf("listening on port %$d\n", port);
52
        
53
        if(pthread_create( &AcceptThread, NULL, &AcceptThreadProc, NULL) != 0)
54
        {
55
                fprintf(stderr,"TCP-INPUT-MODULE: could not start accepting thread!!\n");
56
                return NULL;
57
        }
58
        
59
        return accept_fd;
60
}
61

    
62
static void* AcceptThreadProc(void* params)
63
{
64
        struct sockaddr_storage their_addr;
65
    socklen_t addr_size;
66
    int fd = -1;
67
    
68
    isRunning = 1;
69

    
70
    listen(accept_fd, 10);
71
    
72
    while(isRunning)
73
    {
74
                printf("trying to accept connection...\n");
75
                fd = accept(accept_fd, (struct sockaddr *)&their_addr, &addr_size);
76
                printf("connection requested!!!\n");
77
                if(socket_fd == -1)
78
                {
79
                        socket_fd = fd;
80
                        isReceving = 1;
81
                }
82
                else
83
                {
84
                        isReceving = 0;
85
                        pthread_join(RecvThread, NULL);
86
                        pthread_detach(RecvThread);
87
                        socket_fd = fd;
88
                }
89
                if(pthread_create( &RecvThread, NULL, &RecvThreadProc, NULL) != 0)
90
                {
91
                        fprintf(stderr,"TCP-INPUT-MODULE: could not start receveing thread!!\n");
92
                        return NULL;
93
                }
94
        }
95
        
96
        return NULL;
97
}
98

    
99
static void* RecvThreadProc(void* params)
100
{
101
        int ret = -1;
102
        uint32_t fragment_size = 0;
103
        uint8_t* buffer = (uint8_t*) malloc(TCP_BUF_SIZE);
104

    
105
        fprintf(stderr,"TCP-INPUT-MODULE: receive thread created\n");
106

    
107
        while(isReceving) {
108
                ret=recv(socket_fd, &fragment_size, sizeof(uint32_t), 0);
109
                fragment_size = ntohl(fragment_size);
110

    
111
fprintf(stderr, "TCP-INPUT-MODULE: received %d bytes. Fragment size: %d\n", ret, fragment_size);
112
                if(ret <= 0) {
113
                        break;
114
                }
115

    
116
                ret=recv(socket_fd, buffer, fragment_size, 0);
117
fprintf(stderr, "TCP-INPUT-MODULE: received %d bytes.\n", ret);
118
                while(ret < fragment_size)
119
                        ret+=recv(socket_fd, buffer+ret, fragment_size-ret, 0);
120
                
121
                if(enqueueBlock(buffer, fragment_size))
122
                        fprintf(stderr, "TCP-INPUT-MODULE: could not enqueue a received chunk!! \n");
123
        }
124
        free(buffer);
125
        close(socket_fd);
126
        socket_fd = -1;
127

    
128
        return NULL;
129
}
130

    
131
void finalizeChunkPuller()
132
{
133
        isRunning = 0;
134
        pthread_join(AcceptThread, NULL);
135
        pthread_detach(AcceptThread);
136
        
137
        if(socket_fd > 0)
138
                close(socket_fd);
139

    
140
        close(accept_fd);
141
}