Statistics
| Branch: | Revision:

chunker-player / chunker_streamer / chunk_pusher.c @ 14ab542a

History | View | Annotate | Download (7.7 KB)

1
/*
2
 *  Copyright (c) 2009-2011 Carmelo Daniele, Dario Marchese, Diego Reforgiato, Giuseppe Tropea
3
 *  Copyright (c) 2010-2011 Csaba Kiraly
4
 *  developed for the Napa-Wine EU project. See www.napa-wine.eu
5
 *
6
 *  This is free software; see lgpl-2.1.txt
7
 */
8

    
9
#include <stdlib.h>
10
#include <string.h>
11
#include <sys/time.h>
12
#include <sys/socket.h>
13
#include <netinet/in.h>
14
#include <unistd.h>
15

    
16
#include "external_chunk_transcoding.h"
17
#include "chunker_streamer.h"
18

    
19
#include "chunk_pusher.h"
20

    
21
//#define DEBUG_PUSHER
22

    
23

    
24
extern ChunkerMetadata *cmeta;
25

    
26
struct output {
27
    char* peer_ip;
28
    int peer_port;
29
    int tcp_fd;
30
    bool tcp_fd_connected;
31
    long long int counter;
32
};
33
static bool exit_on_connect_failure = false;
34
static bool connect_on_data = true;
35
static bool exit_on_send_error = false;        //TODO: handle this on Mac
36

    
37

    
38
void connectTCP(struct output *o)
39
{
40
        if(o->tcp_fd == -1)
41
        {
42
                o->tcp_fd=socket(AF_INET, SOCK_STREAM, 0);
43
        }
44
        if (!o->tcp_fd_connected) {
45
                struct sockaddr_in address;
46
                address.sin_family = AF_INET; 
47
                address.sin_addr.s_addr = inet_addr(o->peer_ip);
48
                address.sin_port = htons(o->peer_port);
49
         
50
                int result = connect(o->tcp_fd, (struct sockaddr *)&address, sizeof(struct sockaddr_in));
51
                if(result == -1){
52
                        fprintf(stderr, "TCP OUTPUT MODULE: could not connect to the peer %s:%d!\n", o->peer_ip, o->peer_port);
53
                        if (exit_on_connect_failure) {
54
                                exit(1);
55
                        }
56
                        o->tcp_fd_connected = false;
57
                } else {
58
                        o->tcp_fd_connected = true;
59
                }
60
        }
61
}
62

    
63
struct output *initTCPPush(char* ip, int port)
64
{
65

    
66
        struct output *o = malloc(sizeof(struct output));
67
        if (!o) {
68
                fprintf(stderr, "PUSHER:memory alloc errir\n");
69
                return NULL;
70
        }
71

    
72
        o->peer_ip = strdup(ip);
73
        o->peer_port = port;
74
        o->tcp_fd = -1;
75
        o->tcp_fd_connected = false;
76
        o->counter = 0;
77

    
78
        connectTCP(o);
79

    
80
        return o;
81
}
82

    
83
void finalizeTCPChunkPusher(struct output *o)
84
{
85
        if(o->tcp_fd > 0)
86
        {
87
                close(o->tcp_fd);
88
                o->tcp_fd = -1;
89
        }
90
}
91

    
92
int pushChunkHttp(struct output *o, ExternalChunk *echunk, char *url) {
93

    
94
        Chunk gchunk;
95
        void *grapes_chunk_attributes_block = NULL;
96
        int ret = STREAMER_FAIL_RETURN;
97
        //we need to pack 5 int32s + 2 timeval structs + 1 double
98
        static size_t ExternalChunk_header_size = 5*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 1*CHUNK_TRANSCODING_INT_SIZE*2;
99
        
100
        //update the chunk len here because here we know the external chunk header size
101
        echunk->len = echunk->payload_len + ExternalChunk_header_size;
102

    
103
        /* first pack the chunk info that we get from the streamer into an "attributes" block of a regular GRAPES chunk */
104
        if(        (grapes_chunk_attributes_block = packExternalChunkToAttributes(echunk, ExternalChunk_header_size)) != NULL ) {
105
                struct timeval now;
106

    
107
                /* then fill-up a proper GRAPES chunk */
108
                gchunk.size = echunk->payload_len;
109
                /* then fill the timestamp */
110
                gettimeofday(&now, NULL);
111
                gchunk.timestamp = now.tv_sec * 1000000ULL + now.tv_usec;
112

    
113
                //decide how to create the chunk ID
114
                if(cmeta->cid == 0) {
115
                        gchunk.id = echunk->seq;
116
#ifdef DEBUG_PUSHER
117
                        fprintf(stderr, "PUSHER: packaged SEQ chunkID %d\n", gchunk.id);
118
#endif
119
                }
120
                else if(cmeta->cid == 1) {
121
                        gchunk.id = gchunk.timestamp; //its ID is its start time
122
#ifdef DEBUG_PUSHER
123
                        fprintf(stderr, "PUSHER: packaged TS chunkID %d\n", gchunk.id);
124
#endif
125
                }
126
                else if(cmeta->cid == 2) {
127
                        //its ID is offset by actual time in seconds
128
                        gchunk.id = ++o->counter + cmeta->base_chunkid_sequence_offset;
129
#ifdef DEBUG_PUSHER
130
                        fprintf(stderr, "PUSHER: packaged SEQ %d + %d offset chunkID %d\n", echunk->seq, cmeta->base_chunkid_sequence_offset, gchunk.id);
131
#endif
132
                }
133
                gchunk.attributes = grapes_chunk_attributes_block;
134
                gchunk.attributes_size = ExternalChunk_header_size;
135
                gchunk.data = echunk->data;
136

    
137
#ifdef NHIO
138
                write_chunk(&gchunk);
139
#else
140
                /* 20 bytes are needed to put the chunk header info on the wire + attributes size + payload */
141
                ret = sendViaCurl(gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size, url);
142
                //~ if(ChunkerStreamerTestMode)
143
                        //~ ret = sendViaCurl(gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size, "http://localhost:5557/externalplayer");
144
#endif
145

    
146
                free(grapes_chunk_attributes_block);
147
                return ret;
148
        }
149
        return ret;
150
}
151

    
152
int pushChunkTcp(struct output *o, ExternalChunk *echunk)
153
{
154
        Chunk gchunk;
155
        void *grapes_chunk_attributes_block = NULL;
156
        int ret = STREAMER_FAIL_RETURN;
157
        //we need to pack 5 int32s + 2 timeval structs + 1 double
158
        static size_t ExternalChunk_header_size = 5*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 1*CHUNK_TRANSCODING_INT_SIZE*2;
159
        
160
        //try to connect if not connected
161
        if (connect_on_data && !o->tcp_fd_connected) {
162
                connectTCP(o);
163
                if (!o->tcp_fd_connected) {
164
                        return ret;
165
                }
166
        }
167

    
168
        //update the chunk len here because here we know the external chunk header size
169
        echunk->len = echunk->payload_len + ExternalChunk_header_size;
170

    
171
        /* first pack the chunk info that we get from the streamer into an "attributes" block of a regular GRAPES chunk */
172
        if(        (grapes_chunk_attributes_block = packExternalChunkToAttributes(echunk, ExternalChunk_header_size)) != NULL ) {
173
                struct timeval now;
174

    
175
                /* then fill-up a proper GRAPES chunk */
176
                gchunk.size = echunk->payload_len;
177
                /* then fill the timestamp */
178
                gettimeofday(&now, NULL);
179
                gchunk.timestamp = now.tv_sec * 1000000ULL + now.tv_usec;
180

    
181
                //decide how to create the chunk ID
182
                if(cmeta->cid == 0) {
183
                        gchunk.id = echunk->seq;
184
#ifdef DEBUG_PUSHER
185
                        fprintf(stderr, "PUSHER: packaged SEQ chunkID %d\n", gchunk.id);
186
#endif
187
                }
188
                else if(cmeta->cid == 1) {
189
                        gchunk.id = gchunk.timestamp; //its ID is its start time
190
#ifdef DEBUG_PUSHER
191
                        fprintf(stderr, "PUSHER: packaged TS chunkID %d\n", gchunk.id);
192
#endif
193
                }
194
                else if(cmeta->cid == 2) {
195
                        //its ID is offset by actual time in seconds
196
                        gchunk.id = ++o->counter + cmeta->base_chunkid_sequence_offset;
197
#ifdef DEBUG_PUSHER
198
                        fprintf(stderr, "PUSHER: packaged SEQ %d + %d offset chunkID %d\n", echunk->seq, cmeta->base_chunkid_sequence_offset, gchunk.id);
199
#endif
200
                }
201
                gchunk.attributes = grapes_chunk_attributes_block;
202
                gchunk.attributes_size = ExternalChunk_header_size;
203
                gchunk.data = echunk->data;
204

    
205
#ifdef NHIO
206
                write_chunk(&gchunk);
207
#else
208
                /* 20 bytes are needed to put the chunk header info on the wire + attributes size + payload */
209
                ret = sendViaTcp(o,gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size);
210
#endif
211

    
212
                free(grapes_chunk_attributes_block);
213
                return ret;
214
        }
215
        return ret;
216
}
217

    
218
int sendViaTcp(struct output *o, Chunk gchunk, uint32_t buffer_size)
219
{
220
        uint8_t *buffer=NULL;
221

    
222
        int ret = STREAMER_FAIL_RETURN;
223
        
224
        if(!(o->tcp_fd > 0))
225
        {
226
                fprintf(stderr, "TCP IO-MODULE: trying to send data to a not connected socket!!!\n");
227
                return ret;
228
        }
229

    
230
        if( (buffer = malloc(4 + buffer_size)) != NULL) {
231
                /* encode the GRAPES chunk into network bytes */
232
                encodeChunk(&gchunk, buffer + 4, buffer_size);
233
                *(uint32_t*)buffer = htonl(buffer_size);
234

    
235
#ifdef MSG_NOSIGNAL
236
                int ret = send(o->tcp_fd, buffer, 4 + buffer_size, exit_on_send_error ? 0 : MSG_NOSIGNAL); //TODO: better handling of exit_on_send_error
237
#else
238
                int ret = send(o->tcp_fd, buffer, 4 + buffer_size, 0); //TODO: better handling of exit_on_send_error
239
#endif
240
                //fprintf(stderr, "TCP IO-MODULE: sending %d bytes, %d sent\n", buffer_size, ret);
241
                if (ret < 0) {
242
                        if (errno != EAGAIN && errno != EWOULDBLOCK) {
243
                                fprintf(stderr, "TCP IO-MODULE: closing connection\n");
244
                                close(o->tcp_fd);
245
                                o->tcp_fd = -1;
246
                                o->tcp_fd_connected = false;
247
                        }
248
                        return ret;
249
                }
250
                int tmp;
251
                while(ret != buffer_size)
252
                {
253
#ifdef MSG_NOSIGNAL
254
                        tmp = send(o->tcp_fd, buffer+ret, 4 + buffer_size - ret, exit_on_send_error ? 0 : MSG_NOSIGNAL);
255
#else
256
                        tmp = send(o->tcp_fd, buffer+ret, 4 + buffer_size - ret, 0);
257
#endif
258
                        if(tmp > 0)
259
                                ret += tmp;
260
                        else
261
                                break;
262
                }
263
                
264
                free(buffer);
265
        }
266
        return ret;
267
}