Statistics
| Branch: | Revision:

chunker-player / chunker_streamer / chunk_pusher.c @ 8c04de3a

History | View | Annotate | Download (7.25 KB)

1
/*
2
 *  Copyright (c) 2009-2011 Carmelo Daniele, Dario Marchese, Diego Reforgiato, Giuseppe Tropea
3
 *  developed for the Napa-Wine EU project. See www.napa-wine.eu
4
 *
5
 *  This is free software; see lgpl-2.1.txt
6
 */
7

    
8
#include <stdlib.h>
9
#include <string.h>
10
#include <sys/time.h>
11
#include <sys/socket.h>
12
#include <netinet/in.h>
13
#include <unistd.h>
14

    
15
#include "external_chunk_transcoding.h"
16
#include "chunker_streamer.h"
17

    
18
//#define DEBUG_PUSHER
19

    
20

    
21
int pushChunkHttp(ExternalChunk *echunk, char *url);
22
int pushChunkTcp(ExternalChunk *echunk);
23
int sendViaCurl(Chunk gchunk, int buffer_size, char *url);
24
int sendViaTcp(Chunk gchunk, uint32_t buffer_size);
25

    
26

    
27
extern ChunkerMetadata *cmeta;
28
static long long int counter = 0;
29
static int tcp_fd = -1;
30
static bool tcp_fd_connected = false;
31
static char* peer_ip;
32
static int peer_port;
33
static bool exit_on_connect_failure = false;
34
static bool connect_on_data = true;
35
#ifdef MSG_NOSIGNAL
36
static bool exit_on_send_error = false;
37
#else
38
static bool exit_on_send_error = true;
39
#endif
40

    
41
void initTCPPush(char* ip, int port)
42
{
43
        peer_ip = strdup(ip);
44
        peer_port = port;
45

    
46
        if(tcp_fd == -1)
47
        {
48
                tcp_fd=socket(AF_INET, SOCK_STREAM, 0);
49
        }
50
        if (!tcp_fd_connected) {
51
                struct sockaddr_in address;
52
                address.sin_family = AF_INET; 
53
                address.sin_addr.s_addr = inet_addr(peer_ip);
54
                address.sin_port = htons(peer_port);
55
         
56
                int result = connect(tcp_fd, (struct sockaddr *)&address, sizeof(struct sockaddr_in));
57
                if(result == -1){
58
                        fprintf(stderr, "TCP OUTPUT MODULE: could not connect to the peer!\n");
59
                        if (exit_on_connect_failure) {
60
                                exit(1);
61
                        }
62
                        tcp_fd_connected = false;
63
                } else {
64
                        tcp_fd_connected = true;
65
                }
66
        }
67
}
68

    
69
void finalizeTCPChunkPusher()
70
{
71
        if(tcp_fd > 0)
72
        {
73
                close(tcp_fd);
74
                tcp_fd = -1;
75
        }
76
}
77

    
78
int pushChunkHttp(ExternalChunk *echunk, char *url) {
79

    
80
        Chunk gchunk;
81
        void *grapes_chunk_attributes_block = NULL;
82
        int ret = STREAMER_FAIL_RETURN;
83
        //we need to pack 5 int32s + 2 timeval structs + 1 double
84
        static size_t ExternalChunk_header_size = 5*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 1*CHUNK_TRANSCODING_INT_SIZE*2;
85
        
86
        //update the chunk len here because here we know the external chunk header size
87
        echunk->len = echunk->payload_len + ExternalChunk_header_size;
88

    
89
        /* first pack the chunk info that we get from the streamer into an "attributes" block of a regular GRAPES chunk */
90
        if(        (grapes_chunk_attributes_block = packExternalChunkToAttributes(echunk, ExternalChunk_header_size)) != NULL ) {
91
                struct timeval now;
92

    
93
                /* then fill-up a proper GRAPES chunk */
94
                gchunk.size = echunk->payload_len;
95
                /* then fill the timestamp */
96
                gettimeofday(&now, NULL);
97
                gchunk.timestamp = now.tv_sec * 1000000ULL + now.tv_usec;
98

    
99
                //decide how to create the chunk ID
100
                if(cmeta->cid == 0) {
101
                        gchunk.id = echunk->seq;
102
#ifdef DEBUG_PUSHER
103
                        fprintf(stderr, "PUSHER: packaged SEQ chunkID %d\n", gchunk.id);
104
#endif
105
                }
106
                else if(cmeta->cid == 1) {
107
                        gchunk.id = gchunk.timestamp; //its ID is its start time
108
#ifdef DEBUG_PUSHER
109
                        fprintf(stderr, "PUSHER: packaged TS chunkID %d\n", gchunk.id);
110
#endif
111
                }
112
                else if(cmeta->cid == 2) {
113
                        //its ID is offset by actual time in seconds
114
                        gchunk.id = ++counter + cmeta->base_chunkid_sequence_offset;
115
#ifdef DEBUG_PUSHER
116
                        fprintf(stderr, "PUSHER: packaged SEQ %d + %d offset chunkID %d\n", echunk->seq, cmeta->base_chunkid_sequence_offset, gchunk.id);
117
#endif
118
                }
119
                gchunk.attributes = grapes_chunk_attributes_block;
120
                gchunk.attributes_size = ExternalChunk_header_size;
121
                gchunk.data = echunk->data;
122

    
123
#ifdef NHIO
124
                write_chunk(&gchunk);
125
#else
126
                /* 20 bytes are needed to put the chunk header info on the wire + attributes size + payload */
127
                ret = sendViaCurl(gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size, url);
128
                //~ if(ChunkerStreamerTestMode)
129
                        //~ ret = sendViaCurl(gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size, "http://localhost:5557/externalplayer");
130
#endif
131

    
132
                free(grapes_chunk_attributes_block);
133
                return ret;
134
        }
135
        return ret;
136
}
137

    
138
int pushChunkTcp(ExternalChunk *echunk)
139
{
140
        Chunk gchunk;
141
        void *grapes_chunk_attributes_block = NULL;
142
        int ret = STREAMER_FAIL_RETURN;
143
        //we need to pack 5 int32s + 2 timeval structs + 1 double
144
        static size_t ExternalChunk_header_size = 5*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 1*CHUNK_TRANSCODING_INT_SIZE*2;
145
        
146
        //try to connect if not connected
147
        if (connect_on_data && !tcp_fd_connected) {
148
                initTCPPush(peer_ip, peer_port);
149
                if (!tcp_fd_connected) {
150
                        return ret;
151
                }
152
        }
153

    
154
        //update the chunk len here because here we know the external chunk header size
155
        echunk->len = echunk->payload_len + ExternalChunk_header_size;
156

    
157
        /* first pack the chunk info that we get from the streamer into an "attributes" block of a regular GRAPES chunk */
158
        if(        (grapes_chunk_attributes_block = packExternalChunkToAttributes(echunk, ExternalChunk_header_size)) != NULL ) {
159
                struct timeval now;
160

    
161
                /* then fill-up a proper GRAPES chunk */
162
                gchunk.size = echunk->payload_len;
163
                /* then fill the timestamp */
164
                gettimeofday(&now, NULL);
165
                gchunk.timestamp = now.tv_sec * 1000000ULL + now.tv_usec;
166

    
167
                //decide how to create the chunk ID
168
                if(cmeta->cid == 0) {
169
                        gchunk.id = echunk->seq;
170
#ifdef DEBUG_PUSHER
171
                        fprintf(stderr, "PUSHER: packaged SEQ chunkID %d\n", gchunk.id);
172
#endif
173
                }
174
                else if(cmeta->cid == 1) {
175
                        gchunk.id = gchunk.timestamp; //its ID is its start time
176
#ifdef DEBUG_PUSHER
177
                        fprintf(stderr, "PUSHER: packaged TS chunkID %d\n", gchunk.id);
178
#endif
179
                }
180
                else if(cmeta->cid == 2) {
181
                        //its ID is offset by actual time in seconds
182
                        gchunk.id = ++counter + cmeta->base_chunkid_sequence_offset;
183
#ifdef DEBUG_PUSHER
184
                        fprintf(stderr, "PUSHER: packaged SEQ %d + %d offset chunkID %d\n", echunk->seq, cmeta->base_chunkid_sequence_offset, gchunk.id);
185
#endif
186
                }
187
                gchunk.attributes = grapes_chunk_attributes_block;
188
                gchunk.attributes_size = ExternalChunk_header_size;
189
                gchunk.data = echunk->data;
190

    
191
#ifdef NHIO
192
                write_chunk(&gchunk);
193
#else
194
                /* 20 bytes are needed to put the chunk header info on the wire + attributes size + payload */
195
                ret = sendViaTcp(gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size);
196
#endif
197

    
198
                free(grapes_chunk_attributes_block);
199
                return ret;
200
        }
201
        return ret;
202
}
203

    
204
int sendViaTcp(Chunk gchunk, uint32_t buffer_size)
205
{
206
        uint8_t *buffer=NULL;
207

    
208
        int ret = STREAMER_FAIL_RETURN;
209
        
210
        if(!(tcp_fd > 0))
211
        {
212
                fprintf(stderr, "TCP IO-MODULE: trying to send data to a not connected socket!!!\n");
213
                return ret;
214
        }
215

    
216
        if( (buffer = malloc(4 + buffer_size)) != NULL) {
217
                /* encode the GRAPES chunk into network bytes */
218
                encodeChunk(&gchunk, buffer + 4, buffer_size);
219
                *(uint32_t*)buffer = htonl(buffer_size);
220
                
221
                int ret = send(tcp_fd, buffer, 4 + buffer_size, exit_on_send_error ? 0 : MSG_NOSIGNAL); //TODO: better handling of exit_on_send_error
222
                //fprintf(stderr, "TCP IO-MODULE: sending %d bytes, %d sent\n", buffer_size, ret);
223
                if (ret < 0) {
224
                        if (errno != EAGAIN && errno != EWOULDBLOCK) {
225
                                fprintf(stderr, "TCP IO-MODULE: closing connection\n");
226
                                close(tcp_fd);
227
                                tcp_fd = -1;
228
                                tcp_fd_connected = false;
229
                        }
230
                        return ret;
231
                }
232
                int tmp;
233
                while(ret != buffer_size)
234
                {
235
                        tmp = send(tcp_fd, buffer+ret, 4 + buffer_size - ret, exit_on_send_error ? 0 : MSG_NOSIGNAL);
236
                        if(tmp > 0)
237
                                ret += tmp;
238
                        else
239
                                break;
240
                }
241
                
242
                free(buffer);
243
        }
244
        return ret;
245
}