Statistics
| Branch: | Revision:

chunker-player / chunker_streamer / chunk_pusher.c @ 15edce59

History | View | Annotate | Download (6.22 KB)

1
/*
2
 *  Copyright (c) 2009-2011 Carmelo Daniele, Dario Marchese, Diego Reforgiato, Giuseppe Tropea
3
 *  developed for the Napa-Wine EU project. See www.napa-wine.eu
4
 *
5
 *  This is free software; see lgpl-2.1.txt
6
 */
7

    
8
#include <stdlib.h>
9
#include <string.h>
10
#include <sys/time.h>
11
#include <sys/socket.h>
12
#include <netinet/in.h>
13
#include <unistd.h>
14

    
15
#include "external_chunk_transcoding.h"
16
#include "chunker_streamer.h"
17

    
18
//#define DEBUG_PUSHER
19

    
20

    
21
int pushChunkHttp(ExternalChunk *echunk, char *url);
22
int pushChunkTcp(ExternalChunk *echunk);
23

    
24
extern ChunkerMetadata *cmeta;
25
static long long int counter = 0;
26
static int tcp_fd = -1;
27

    
28
void initTCPPush(char* peer_ip, int peer_port)
29
{
30
        if(tcp_fd == -1)
31
        {
32
                tcp_fd=socket(AF_INET, SOCK_STREAM, 0);
33
        
34
                struct sockaddr_in address;
35
                address.sin_family = AF_INET; 
36
                address.sin_addr.s_addr = inet_addr(peer_ip);
37
                address.sin_port = htons(peer_port);
38
                 
39
                int result = connect(tcp_fd, (struct sockaddr *)&address, sizeof(struct sockaddr_in));
40
                if(result == -1){
41
                        fprintf(stderr, "TCP OUTPUT MODULE: could not connect to the peer!\n");
42
                        exit(1);
43
                }
44
        }
45
}
46

    
47
void finalizeTCPChunkPusher()
48
{
49
        if(tcp_fd > 0)
50
        {
51
                close(tcp_fd);
52
                tcp_fd = -1;
53
        }
54
}
55

    
56
int pushChunkHttp(ExternalChunk *echunk, char *url) {
57

    
58
        Chunk gchunk;
59
        void *grapes_chunk_attributes_block = NULL;
60
        int ret = STREAMER_FAIL_RETURN;
61
        //we need to pack 5 int32s + 2 timeval structs + 1 double
62
        static size_t ExternalChunk_header_size = 5*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 1*CHUNK_TRANSCODING_INT_SIZE*2;
63
        
64
        //update the chunk len here because here we know the external chunk header size
65
        echunk->len = echunk->payload_len + ExternalChunk_header_size;
66

    
67
        /* first pack the chunk info that we get from the streamer into an "attributes" block of a regular GRAPES chunk */
68
        if(        (grapes_chunk_attributes_block = packExternalChunkToAttributes(echunk, ExternalChunk_header_size)) != NULL ) {
69
                struct timeval now;
70

    
71
                /* then fill-up a proper GRAPES chunk */
72
                gchunk.size = echunk->payload_len;
73
                /* then fill the timestamp */
74
                gettimeofday(&now, NULL);
75
                gchunk.timestamp = now.tv_sec * 1000000ULL + now.tv_usec;
76

    
77
                //decide how to create the chunk ID
78
                if(cmeta->cid == 0) {
79
                        gchunk.id = echunk->seq;
80
#ifdef DEBUG_PUSHER
81
                        fprintf(stderr, "PUSHER: packaged SEQ chunkID %d\n", gchunk.id);
82
#endif
83
                }
84
                else if(cmeta->cid == 1) {
85
                        gchunk.id = gchunk.timestamp; //its ID is its start time
86
#ifdef DEBUG_PUSHER
87
                        fprintf(stderr, "PUSHER: packaged TS chunkID %d\n", gchunk.id);
88
#endif
89
                }
90
                else if(cmeta->cid == 2) {
91
                        //its ID is offset by actual time in seconds
92
                        gchunk.id = ++counter + cmeta->base_chunkid_sequence_offset;
93
#ifdef DEBUG_PUSHER
94
                        fprintf(stderr, "PUSHER: packaged SEQ %d + %d offset chunkID %d\n", echunk->seq, cmeta->base_chunkid_sequence_offset, gchunk.id);
95
#endif
96
                }
97
                gchunk.attributes = grapes_chunk_attributes_block;
98
                gchunk.attributes_size = ExternalChunk_header_size;
99
                gchunk.data = echunk->data;
100

    
101
#ifdef NHIO
102
                write_chunk(&gchunk);
103
#else
104
                /* 20 bytes are needed to put the chunk header info on the wire + attributes size + payload */
105
                ret = sendViaCurl(gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size, url);
106
                //~ if(ChunkerStreamerTestMode)
107
                        //~ ret = sendViaCurl(gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size, "http://localhost:5557/externalplayer");
108
#endif
109

    
110
                free(grapes_chunk_attributes_block);
111
                return ret;
112
        }
113
        return ret;
114
}
115

    
116
int pushChunkTcp(ExternalChunk *echunk)
117
{
118
        Chunk gchunk;
119
        void *grapes_chunk_attributes_block = NULL;
120
        int ret = STREAMER_FAIL_RETURN;
121
        //we need to pack 5 int32s + 2 timeval structs + 1 double
122
        static size_t ExternalChunk_header_size = 5*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 1*CHUNK_TRANSCODING_INT_SIZE*2;
123
        
124
        //update the chunk len here because here we know the external chunk header size
125
        echunk->len = echunk->payload_len + ExternalChunk_header_size;
126

    
127
        /* first pack the chunk info that we get from the streamer into an "attributes" block of a regular GRAPES chunk */
128
        if(        (grapes_chunk_attributes_block = packExternalChunkToAttributes(echunk, ExternalChunk_header_size)) != NULL ) {
129
                struct timeval now;
130

    
131
                /* then fill-up a proper GRAPES chunk */
132
                gchunk.size = echunk->payload_len;
133
                /* then fill the timestamp */
134
                gettimeofday(&now, NULL);
135
                gchunk.timestamp = now.tv_sec * 1000000ULL + now.tv_usec;
136

    
137
                //decide how to create the chunk ID
138
                if(cmeta->cid == 0) {
139
                        gchunk.id = echunk->seq;
140
#ifdef DEBUG_PUSHER
141
                        fprintf(stderr, "PUSHER: packaged SEQ chunkID %d\n", gchunk.id);
142
#endif
143
                }
144
                else if(cmeta->cid == 1) {
145
                        gchunk.id = gchunk.timestamp; //its ID is its start time
146
#ifdef DEBUG_PUSHER
147
                        fprintf(stderr, "PUSHER: packaged TS chunkID %d\n", gchunk.id);
148
#endif
149
                }
150
                else if(cmeta->cid == 2) {
151
                        //its ID is offset by actual time in seconds
152
                        gchunk.id = ++counter + cmeta->base_chunkid_sequence_offset;
153
#ifdef DEBUG_PUSHER
154
                        fprintf(stderr, "PUSHER: packaged SEQ %d + %d offset chunkID %d\n", echunk->seq, cmeta->base_chunkid_sequence_offset, gchunk.id);
155
#endif
156
                }
157
                gchunk.attributes = grapes_chunk_attributes_block;
158
                gchunk.attributes_size = ExternalChunk_header_size;
159
                gchunk.data = echunk->data;
160

    
161
#ifdef NHIO
162
                write_chunk(&gchunk);
163
#else
164
                /* 20 bytes are needed to put the chunk header info on the wire + attributes size + payload */
165
                ret = sendViaTcp(gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size);
166
#endif
167

    
168
                free(grapes_chunk_attributes_block);
169
                return ret;
170
        }
171
        return ret;
172
}
173

    
174
int sendViaTcp(Chunk gchunk, uint32_t buffer_size)
175
{
176
        uint8_t *buffer=NULL;
177

    
178
        int ret = STREAMER_FAIL_RETURN;
179
        
180
        if(!(tcp_fd > 0))
181
        {
182
                fprintf(stderr, "TCP IO-MODULE: trying to send data to a not connected socket!!!\n");
183
                return ret;
184
        }
185

    
186
        if( (buffer = malloc(4 + buffer_size)) != NULL) {
187
                /* encode the GRAPES chunk into network bytes */
188
                encodeChunk(&gchunk, buffer + 4, buffer_size);
189
                *(uint32_t*)buffer = htonl(buffer_size);
190
                
191
                int ret = send(tcp_fd, buffer, 4 + buffer_size, 0);
192
fprintf(stderr, "TCP IO-MODULE: sending %d bytes, %d sent\n", buffer_size, ret);
193
                int tmp;
194
                while(ret != buffer_size)
195
                {
196
                        tmp = send(tcp_fd, buffer+ret, 4 + buffer_size - ret, 0);
197
                        if(tmp > 0)
198
                                ret += tmp;
199
                        else
200
                                break;
201
                }
202
                
203
                free(buffer);
204
        }
205
        return ret;
206
}