Statistics
| Branch: | Revision:

chunker-player / chunker_streamer / chunk_pusher.c @ 579e2c14

History | View | Annotate | Download (6.7 KB)

1
/*
2
 *  Copyright (c) 2009-2011 Carmelo Daniele, Dario Marchese, Diego Reforgiato, Giuseppe Tropea
3
 *  developed for the Napa-Wine EU project. See www.napa-wine.eu
4
 *
5
 *  This is free software; see lgpl-2.1.txt
6
 */
7

    
8
#include <stdlib.h>
9
#include <string.h>
10
#include <sys/time.h>
11
#include <sys/socket.h>
12
#include <netinet/in.h>
13
#include <unistd.h>
14

    
15
#include "external_chunk_transcoding.h"
16
#include "chunker_streamer.h"
17

    
18
//#define DEBUG_PUSHER
19

    
20

    
21
int pushChunkHttp(ExternalChunk *echunk, char *url);
22
int pushChunkTcp(ExternalChunk *echunk);
23

    
24
extern ChunkerMetadata *cmeta;
25
static long long int counter = 0;
26
static int tcp_fd = -1;
27
static bool tcp_fd_connected = false;
28
static char* peer_ip;
29
static int peer_port;
30
static bool exit_on_connect_failure = false;
31
static bool connect_on_data = true;
32

    
33
void initTCPPush(char* ip, int port)
34
{
35
        peer_ip = strdup(ip);
36
        peer_port = port;
37

    
38
        if(tcp_fd == -1)
39
        {
40
                tcp_fd=socket(AF_INET, SOCK_STREAM, 0);
41
        }
42
        if (!tcp_fd_connected) {
43
                struct sockaddr_in address;
44
                address.sin_family = AF_INET; 
45
                address.sin_addr.s_addr = inet_addr(peer_ip);
46
                address.sin_port = htons(peer_port);
47
         
48
                int result = connect(tcp_fd, (struct sockaddr *)&address, sizeof(struct sockaddr_in));
49
                if(result == -1){
50
                        fprintf(stderr, "TCP OUTPUT MODULE: could not connect to the peer!\n");
51
                        if (exit_on_connect_failure) {
52
                                exit(1);
53
                        }
54
                        tcp_fd_connected = false;
55
                } else {
56
                        tcp_fd_connected = true;
57
                }
58
        }
59
}
60

    
61
void finalizeTCPChunkPusher()
62
{
63
        if(tcp_fd > 0)
64
        {
65
                close(tcp_fd);
66
                tcp_fd = -1;
67
        }
68
}
69

    
70
int pushChunkHttp(ExternalChunk *echunk, char *url) {
71

    
72
        Chunk gchunk;
73
        void *grapes_chunk_attributes_block = NULL;
74
        int ret = STREAMER_FAIL_RETURN;
75
        //we need to pack 5 int32s + 2 timeval structs + 1 double
76
        static size_t ExternalChunk_header_size = 5*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 1*CHUNK_TRANSCODING_INT_SIZE*2;
77
        
78
        //update the chunk len here because here we know the external chunk header size
79
        echunk->len = echunk->payload_len + ExternalChunk_header_size;
80

    
81
        /* first pack the chunk info that we get from the streamer into an "attributes" block of a regular GRAPES chunk */
82
        if(        (grapes_chunk_attributes_block = packExternalChunkToAttributes(echunk, ExternalChunk_header_size)) != NULL ) {
83
                struct timeval now;
84

    
85
                /* then fill-up a proper GRAPES chunk */
86
                gchunk.size = echunk->payload_len;
87
                /* then fill the timestamp */
88
                gettimeofday(&now, NULL);
89
                gchunk.timestamp = now.tv_sec * 1000000ULL + now.tv_usec;
90

    
91
                //decide how to create the chunk ID
92
                if(cmeta->cid == 0) {
93
                        gchunk.id = echunk->seq;
94
#ifdef DEBUG_PUSHER
95
                        fprintf(stderr, "PUSHER: packaged SEQ chunkID %d\n", gchunk.id);
96
#endif
97
                }
98
                else if(cmeta->cid == 1) {
99
                        gchunk.id = gchunk.timestamp; //its ID is its start time
100
#ifdef DEBUG_PUSHER
101
                        fprintf(stderr, "PUSHER: packaged TS chunkID %d\n", gchunk.id);
102
#endif
103
                }
104
                else if(cmeta->cid == 2) {
105
                        //its ID is offset by actual time in seconds
106
                        gchunk.id = ++counter + cmeta->base_chunkid_sequence_offset;
107
#ifdef DEBUG_PUSHER
108
                        fprintf(stderr, "PUSHER: packaged SEQ %d + %d offset chunkID %d\n", echunk->seq, cmeta->base_chunkid_sequence_offset, gchunk.id);
109
#endif
110
                }
111
                gchunk.attributes = grapes_chunk_attributes_block;
112
                gchunk.attributes_size = ExternalChunk_header_size;
113
                gchunk.data = echunk->data;
114

    
115
#ifdef NHIO
116
                write_chunk(&gchunk);
117
#else
118
                /* 20 bytes are needed to put the chunk header info on the wire + attributes size + payload */
119
                ret = sendViaCurl(gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size, url);
120
                //~ if(ChunkerStreamerTestMode)
121
                        //~ ret = sendViaCurl(gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size, "http://localhost:5557/externalplayer");
122
#endif
123

    
124
                free(grapes_chunk_attributes_block);
125
                return ret;
126
        }
127
        return ret;
128
}
129

    
130
int pushChunkTcp(ExternalChunk *echunk)
131
{
132
        Chunk gchunk;
133
        void *grapes_chunk_attributes_block = NULL;
134
        int ret = STREAMER_FAIL_RETURN;
135
        //we need to pack 5 int32s + 2 timeval structs + 1 double
136
        static size_t ExternalChunk_header_size = 5*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 1*CHUNK_TRANSCODING_INT_SIZE*2;
137
        
138
        //try to connect if not connected
139
        if (connect_on_data && !tcp_fd_connected) {
140
                initTCPPush(peer_ip, peer_port);
141
                if (!tcp_fd_connected) {
142
                        return ret;
143
                }
144
        }
145

    
146
        //update the chunk len here because here we know the external chunk header size
147
        echunk->len = echunk->payload_len + ExternalChunk_header_size;
148

    
149
        /* first pack the chunk info that we get from the streamer into an "attributes" block of a regular GRAPES chunk */
150
        if(        (grapes_chunk_attributes_block = packExternalChunkToAttributes(echunk, ExternalChunk_header_size)) != NULL ) {
151
                struct timeval now;
152

    
153
                /* then fill-up a proper GRAPES chunk */
154
                gchunk.size = echunk->payload_len;
155
                /* then fill the timestamp */
156
                gettimeofday(&now, NULL);
157
                gchunk.timestamp = now.tv_sec * 1000000ULL + now.tv_usec;
158

    
159
                //decide how to create the chunk ID
160
                if(cmeta->cid == 0) {
161
                        gchunk.id = echunk->seq;
162
#ifdef DEBUG_PUSHER
163
                        fprintf(stderr, "PUSHER: packaged SEQ chunkID %d\n", gchunk.id);
164
#endif
165
                }
166
                else if(cmeta->cid == 1) {
167
                        gchunk.id = gchunk.timestamp; //its ID is its start time
168
#ifdef DEBUG_PUSHER
169
                        fprintf(stderr, "PUSHER: packaged TS chunkID %d\n", gchunk.id);
170
#endif
171
                }
172
                else if(cmeta->cid == 2) {
173
                        //its ID is offset by actual time in seconds
174
                        gchunk.id = ++counter + cmeta->base_chunkid_sequence_offset;
175
#ifdef DEBUG_PUSHER
176
                        fprintf(stderr, "PUSHER: packaged SEQ %d + %d offset chunkID %d\n", echunk->seq, cmeta->base_chunkid_sequence_offset, gchunk.id);
177
#endif
178
                }
179
                gchunk.attributes = grapes_chunk_attributes_block;
180
                gchunk.attributes_size = ExternalChunk_header_size;
181
                gchunk.data = echunk->data;
182

    
183
#ifdef NHIO
184
                write_chunk(&gchunk);
185
#else
186
                /* 20 bytes are needed to put the chunk header info on the wire + attributes size + payload */
187
                ret = sendViaTcp(gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size);
188
#endif
189

    
190
                free(grapes_chunk_attributes_block);
191
                return ret;
192
        }
193
        return ret;
194
}
195

    
196
int sendViaTcp(Chunk gchunk, uint32_t buffer_size)
197
{
198
        uint8_t *buffer=NULL;
199

    
200
        int ret = STREAMER_FAIL_RETURN;
201
        
202
        if(!(tcp_fd > 0))
203
        {
204
                fprintf(stderr, "TCP IO-MODULE: trying to send data to a not connected socket!!!\n");
205
                return ret;
206
        }
207

    
208
        if( (buffer = malloc(4 + buffer_size)) != NULL) {
209
                /* encode the GRAPES chunk into network bytes */
210
                encodeChunk(&gchunk, buffer + 4, buffer_size);
211
                *(uint32_t*)buffer = htonl(buffer_size);
212
                
213
                int ret = send(tcp_fd, buffer, 4 + buffer_size, 0);
214
fprintf(stderr, "TCP IO-MODULE: sending %d bytes, %d sent\n", buffer_size, ret);
215
                int tmp;
216
                while(ret != buffer_size)
217
                {
218
                        tmp = send(tcp_fd, buffer+ret, 4 + buffer_size - ret, 0);
219
                        if(tmp > 0)
220
                                ret += tmp;
221
                        else
222
                                break;
223
                }
224
                
225
                free(buffer);
226
        }
227
        return ret;
228
}