Statistics
| Branch: | Revision:

chunker-player / chunker_streamer / chunk_pusher.c @ e773088f

History | View | Annotate | Download (7.43 KB)

1
/*
2
 *  Copyright (c) 2009-2011 Carmelo Daniele, Dario Marchese, Diego Reforgiato, Giuseppe Tropea
3
 *  developed for the Napa-Wine EU project. See www.napa-wine.eu
4
 *
5
 *  This is free software; see lgpl-2.1.txt
6
 */
7

    
8
#include <stdlib.h>
9
#include <string.h>
10
#include <sys/time.h>
11
#include <sys/socket.h>
12
#include <netinet/in.h>
13
#include <unistd.h>
14

    
15
#include "external_chunk_transcoding.h"
16
#include "chunker_streamer.h"
17

    
18
//#define DEBUG_PUSHER
19

    
20

    
21
int pushChunkHttp(ExternalChunk *echunk, char *url);
22
int pushChunkTcp(ExternalChunk *echunk);
23
int sendViaCurl(Chunk gchunk, int buffer_size, char *url);
24
int sendViaTcp(Chunk gchunk, uint32_t buffer_size);
25

    
26

    
27
extern ChunkerMetadata *cmeta;
28
static long long int counter = 0;
29
static int tcp_fd = -1;
30
static bool tcp_fd_connected = false;
31
static char* peer_ip;
32
static int peer_port;
33
static bool exit_on_connect_failure = false;
34
static bool connect_on_data = true;
35
static bool exit_on_send_error = false;        //TODO: handle this on Mac
36

    
37
void initTCPPush(char* ip, int port)
38
{
39
        peer_ip = strdup(ip);
40
        peer_port = port;
41

    
42
        if(tcp_fd == -1)
43
        {
44
                tcp_fd=socket(AF_INET, SOCK_STREAM, 0);
45
        }
46
        if (!tcp_fd_connected) {
47
                struct sockaddr_in address;
48
                address.sin_family = AF_INET; 
49
                address.sin_addr.s_addr = inet_addr(peer_ip);
50
                address.sin_port = htons(peer_port);
51
         
52
                int result = connect(tcp_fd, (struct sockaddr *)&address, sizeof(struct sockaddr_in));
53
                if(result == -1){
54
                        fprintf(stderr, "TCP OUTPUT MODULE: could not connect to the peer!\n");
55
                        if (exit_on_connect_failure) {
56
                                exit(1);
57
                        }
58
                        tcp_fd_connected = false;
59
                } else {
60
                        tcp_fd_connected = true;
61
                }
62
        }
63
}
64

    
65
void finalizeTCPChunkPusher()
66
{
67
        if(tcp_fd > 0)
68
        {
69
                close(tcp_fd);
70
                tcp_fd = -1;
71
        }
72
}
73

    
74
int pushChunkHttp(ExternalChunk *echunk, char *url) {
75

    
76
        Chunk gchunk;
77
        void *grapes_chunk_attributes_block = NULL;
78
        int ret = STREAMER_FAIL_RETURN;
79
        //we need to pack 5 int32s + 2 timeval structs + 1 double
80
        static size_t ExternalChunk_header_size = 5*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 1*CHUNK_TRANSCODING_INT_SIZE*2;
81
        
82
        //update the chunk len here because here we know the external chunk header size
83
        echunk->len = echunk->payload_len + ExternalChunk_header_size;
84

    
85
        /* first pack the chunk info that we get from the streamer into an "attributes" block of a regular GRAPES chunk */
86
        if(        (grapes_chunk_attributes_block = packExternalChunkToAttributes(echunk, ExternalChunk_header_size)) != NULL ) {
87
                struct timeval now;
88

    
89
                /* then fill-up a proper GRAPES chunk */
90
                gchunk.size = echunk->payload_len;
91
                /* then fill the timestamp */
92
                gettimeofday(&now, NULL);
93
                gchunk.timestamp = now.tv_sec * 1000000ULL + now.tv_usec;
94

    
95
                //decide how to create the chunk ID
96
                if(cmeta->cid == 0) {
97
                        gchunk.id = echunk->seq;
98
#ifdef DEBUG_PUSHER
99
                        fprintf(stderr, "PUSHER: packaged SEQ chunkID %d\n", gchunk.id);
100
#endif
101
                }
102
                else if(cmeta->cid == 1) {
103
                        gchunk.id = gchunk.timestamp; //its ID is its start time
104
#ifdef DEBUG_PUSHER
105
                        fprintf(stderr, "PUSHER: packaged TS chunkID %d\n", gchunk.id);
106
#endif
107
                }
108
                else if(cmeta->cid == 2) {
109
                        //its ID is offset by actual time in seconds
110
                        gchunk.id = ++counter + cmeta->base_chunkid_sequence_offset;
111
#ifdef DEBUG_PUSHER
112
                        fprintf(stderr, "PUSHER: packaged SEQ %d + %d offset chunkID %d\n", echunk->seq, cmeta->base_chunkid_sequence_offset, gchunk.id);
113
#endif
114
                }
115
                gchunk.attributes = grapes_chunk_attributes_block;
116
                gchunk.attributes_size = ExternalChunk_header_size;
117
                gchunk.data = echunk->data;
118

    
119
#ifdef NHIO
120
                write_chunk(&gchunk);
121
#else
122
                /* 20 bytes are needed to put the chunk header info on the wire + attributes size + payload */
123
                ret = sendViaCurl(gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size, url);
124
                //~ if(ChunkerStreamerTestMode)
125
                        //~ ret = sendViaCurl(gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size, "http://localhost:5557/externalplayer");
126
#endif
127

    
128
                free(grapes_chunk_attributes_block);
129
                return ret;
130
        }
131
        return ret;
132
}
133

    
134
int pushChunkTcp(ExternalChunk *echunk)
135
{
136
        Chunk gchunk;
137
        void *grapes_chunk_attributes_block = NULL;
138
        int ret = STREAMER_FAIL_RETURN;
139
        //we need to pack 5 int32s + 2 timeval structs + 1 double
140
        static size_t ExternalChunk_header_size = 5*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 1*CHUNK_TRANSCODING_INT_SIZE*2;
141
        
142
        //try to connect if not connected
143
        if (connect_on_data && !tcp_fd_connected) {
144
                initTCPPush(peer_ip, peer_port);
145
                if (!tcp_fd_connected) {
146
                        return ret;
147
                }
148
        }
149

    
150
        //update the chunk len here because here we know the external chunk header size
151
        echunk->len = echunk->payload_len + ExternalChunk_header_size;
152

    
153
        /* first pack the chunk info that we get from the streamer into an "attributes" block of a regular GRAPES chunk */
154
        if(        (grapes_chunk_attributes_block = packExternalChunkToAttributes(echunk, ExternalChunk_header_size)) != NULL ) {
155
                struct timeval now;
156

    
157
                /* then fill-up a proper GRAPES chunk */
158
                gchunk.size = echunk->payload_len;
159
                /* then fill the timestamp */
160
                gettimeofday(&now, NULL);
161
                gchunk.timestamp = now.tv_sec * 1000000ULL + now.tv_usec;
162

    
163
                //decide how to create the chunk ID
164
                if(cmeta->cid == 0) {
165
                        gchunk.id = echunk->seq;
166
#ifdef DEBUG_PUSHER
167
                        fprintf(stderr, "PUSHER: packaged SEQ chunkID %d\n", gchunk.id);
168
#endif
169
                }
170
                else if(cmeta->cid == 1) {
171
                        gchunk.id = gchunk.timestamp; //its ID is its start time
172
#ifdef DEBUG_PUSHER
173
                        fprintf(stderr, "PUSHER: packaged TS chunkID %d\n", gchunk.id);
174
#endif
175
                }
176
                else if(cmeta->cid == 2) {
177
                        //its ID is offset by actual time in seconds
178
                        gchunk.id = ++counter + cmeta->base_chunkid_sequence_offset;
179
#ifdef DEBUG_PUSHER
180
                        fprintf(stderr, "PUSHER: packaged SEQ %d + %d offset chunkID %d\n", echunk->seq, cmeta->base_chunkid_sequence_offset, gchunk.id);
181
#endif
182
                }
183
                gchunk.attributes = grapes_chunk_attributes_block;
184
                gchunk.attributes_size = ExternalChunk_header_size;
185
                gchunk.data = echunk->data;
186

    
187
#ifdef NHIO
188
                write_chunk(&gchunk);
189
#else
190
                /* 20 bytes are needed to put the chunk header info on the wire + attributes size + payload */
191
                ret = sendViaTcp(gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size);
192
#endif
193

    
194
                free(grapes_chunk_attributes_block);
195
                return ret;
196
        }
197
        return ret;
198
}
199

    
200
int sendViaTcp(Chunk gchunk, uint32_t buffer_size)
201
{
202
        uint8_t *buffer=NULL;
203

    
204
        int ret = STREAMER_FAIL_RETURN;
205
        
206
        if(!(tcp_fd > 0))
207
        {
208
                fprintf(stderr, "TCP IO-MODULE: trying to send data to a not connected socket!!!\n");
209
                return ret;
210
        }
211

    
212
        if( (buffer = malloc(4 + buffer_size)) != NULL) {
213
                /* encode the GRAPES chunk into network bytes */
214
                encodeChunk(&gchunk, buffer + 4, buffer_size);
215
                *(uint32_t*)buffer = htonl(buffer_size);
216

    
217
#ifdef MSG_NOSIGNAL
218
                int ret = send(tcp_fd, buffer, 4 + buffer_size, exit_on_send_error ? 0 : MSG_NOSIGNAL); //TODO: better handling of exit_on_send_error
219
#else
220
                int ret = send(tcp_fd, buffer, 4 + buffer_size, 0); //TODO: better handling of exit_on_send_error
221
#endif
222
                //fprintf(stderr, "TCP IO-MODULE: sending %d bytes, %d sent\n", buffer_size, ret);
223
                if (ret < 0) {
224
                        if (errno != EAGAIN && errno != EWOULDBLOCK) {
225
                                fprintf(stderr, "TCP IO-MODULE: closing connection\n");
226
                                close(tcp_fd);
227
                                tcp_fd = -1;
228
                                tcp_fd_connected = false;
229
                        }
230
                        return ret;
231
                }
232
                int tmp;
233
                while(ret != buffer_size)
234
                {
235
#ifdef MSG_NOSIGNAL
236
                        tmp = send(tcp_fd, buffer+ret, 4 + buffer_size - ret, exit_on_send_error ? 0 : MSG_NOSIGNAL);
237
#else
238
                        tmp = send(tcp_fd, buffer+ret, 4 + buffer_size - ret, 0);
239
#endif
240
                        if(tmp > 0)
241
                                ret += tmp;
242
                        else
243
                                break;
244
                }
245
                
246
                free(buffer);
247
        }
248
        return ret;
249
}