Statistics
| Branch: | Revision:

chunker-player / chunker_streamer / chunk_pusher.c @ e11386c0

History | View | Annotate | Download (5.86 KB)

1
#include <stdlib.h>
2
#include <string.h>
3
#include <sys/time.h>
4
#include <sys/socket.h>
5
#include <netinet/in.h>
6
#include <unistd.h>
7

    
8
#include "external_chunk_transcoding.h"
9
#include "chunker_streamer.h"
10

    
11
//#define DEBUG_PUSHER
12

    
13

    
14
int pushChunkHttp(ExternalChunk *echunk, char *url);
15
int pushChunkTcp(ExternalChunk *echunk);
16

    
17
extern ChunkerMetadata *cmeta;
18
static long long int counter = 0;
19
static int tcp_fd = -1;
20

    
21
void initTCPPush(char* peer_ip, int peer_port)
22
{
23
        if(tcp_fd == -1)
24
        {
25
                tcp_fd=socket(AF_INET, SOCK_STREAM, 0);
26
        
27
                struct sockaddr_in address;
28
                address.sin_family = AF_INET; 
29
                address.sin_addr.s_addr = inet_addr(peer_ip);
30
                address.sin_port = htons(peer_port);
31
                 
32
                int result = connect(tcp_fd, (struct sockaddr *)&address, sizeof(struct sockaddr_in));
33
                if(result == -1){
34
                        fprintf(stderr, "TCP OUTPUT MODULE: could not connect to the peer!\n");
35
                        exit(1);
36
                }
37
        }
38
}
39

    
40
void finalizeTCPChunkPusher()
41
{
42
        if(tcp_fd > 0)
43
        {
44
                close(tcp_fd);
45
                tcp_fd = -1;
46
        }
47
}
48

    
49
int pushChunkHttp(ExternalChunk *echunk, char *url) {
50

    
51
        Chunk gchunk;
52
        void *grapes_chunk_attributes_block = NULL;
53
        int ret = STREAMER_FAIL_RETURN;
54
        //we need to pack 5 int32s + 2 timeval structs + 1 double
55
        static size_t ExternalChunk_header_size = 5*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 1*CHUNK_TRANSCODING_INT_SIZE*2;
56
        
57
        //update the chunk len here because here we know the external chunk header size
58
        echunk->len = echunk->payload_len + ExternalChunk_header_size;
59

    
60
        /* first pack the chunk info that we get from the streamer into an "attributes" block of a regular GRAPES chunk */
61
        if(        (grapes_chunk_attributes_block = packExternalChunkToAttributes(echunk, ExternalChunk_header_size)) != NULL ) {
62
                struct timeval now;
63

    
64
                /* then fill-up a proper GRAPES chunk */
65
                gchunk.size = echunk->payload_len;
66
                /* then fill the timestamp */
67
                gettimeofday(&now, NULL);
68
                gchunk.timestamp = now.tv_sec * 1000000ULL + now.tv_usec;
69

    
70
                //decide how to create the chunk ID
71
                if(cmeta->cid == 0) {
72
                        gchunk.id = echunk->seq;
73
#ifdef DEBUG_PUSHER
74
                        fprintf(stderr, "PUSHER: packaged SEQ chunkID %d\n", gchunk.id);
75
#endif
76
                }
77
                else if(cmeta->cid == 1) {
78
                        gchunk.id = gchunk.timestamp; //its ID is its start time
79
#ifdef DEBUG_PUSHER
80
                        fprintf(stderr, "PUSHER: packaged TS chunkID %d\n", gchunk.id);
81
#endif
82
                }
83
                else if(cmeta->cid == 2) {
84
                        //its ID is offset by actual time in seconds
85
                        gchunk.id = ++counter + cmeta->base_chunkid_sequence_offset;
86
#ifdef DEBUG_PUSHER
87
                        fprintf(stderr, "PUSHER: packaged SEQ %d + %d offset chunkID %d\n", echunk->seq, cmeta->base_chunkid_sequence_offset, gchunk.id);
88
#endif
89
                }
90
                gchunk.attributes = grapes_chunk_attributes_block;
91
                gchunk.attributes_size = ExternalChunk_header_size;
92
                gchunk.data = echunk->data;
93

    
94
#ifdef NHIO
95
                write_chunk(&gchunk);
96
#else
97
                /* 20 bytes are needed to put the chunk header info on the wire + attributes size + payload */
98
                ret = sendViaCurl(gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size, url);
99
                //~ if(ChunkerStreamerTestMode)
100
                        //~ ret = sendViaCurl(gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size, "http://localhost:5557/externalplayer");
101
#endif
102

    
103
                free(grapes_chunk_attributes_block);
104
                return ret;
105
        }
106
        return ret;
107
}
108

    
109
int pushChunkTcp(ExternalChunk *echunk)
110
{
111
        Chunk gchunk;
112
        void *grapes_chunk_attributes_block = NULL;
113
        int ret = STREAMER_FAIL_RETURN;
114
        //we need to pack 5 int32s + 2 timeval structs + 1 double
115
        static size_t ExternalChunk_header_size = 5*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 1*CHUNK_TRANSCODING_INT_SIZE*2;
116
        
117
        //update the chunk len here because here we know the external chunk header size
118
        echunk->len = echunk->payload_len + ExternalChunk_header_size;
119

    
120
        /* first pack the chunk info that we get from the streamer into an "attributes" block of a regular GRAPES chunk */
121
        if(        (grapes_chunk_attributes_block = packExternalChunkToAttributes(echunk, ExternalChunk_header_size)) != NULL ) {
122
                struct timeval now;
123

    
124
                /* then fill-up a proper GRAPES chunk */
125
                gchunk.size = echunk->payload_len;
126
                /* then fill the timestamp */
127
                gettimeofday(&now, NULL);
128
                gchunk.timestamp = now.tv_sec * 1000000ULL + now.tv_usec;
129

    
130
                //decide how to create the chunk ID
131
                if(cmeta->cid == 0) {
132
                        gchunk.id = echunk->seq;
133
#ifdef DEBUG_PUSHER
134
                        fprintf(stderr, "PUSHER: packaged SEQ chunkID %d\n", gchunk.id);
135
#endif
136
                }
137
                else if(cmeta->cid == 1) {
138
                        gchunk.id = gchunk.timestamp; //its ID is its start time
139
#ifdef DEBUG_PUSHER
140
                        fprintf(stderr, "PUSHER: packaged TS chunkID %d\n", gchunk.id);
141
#endif
142
                }
143
                else if(cmeta->cid == 2) {
144
                        //its ID is offset by actual time in seconds
145
                        gchunk.id = ++counter + cmeta->base_chunkid_sequence_offset;
146
#ifdef DEBUG_PUSHER
147
                        fprintf(stderr, "PUSHER: packaged SEQ %d + %d offset chunkID %d\n", echunk->seq, cmeta->base_chunkid_sequence_offset, gchunk.id);
148
#endif
149
                }
150
                gchunk.attributes = grapes_chunk_attributes_block;
151
                gchunk.attributes_size = ExternalChunk_header_size;
152
                gchunk.data = echunk->data;
153

    
154
#ifdef NHIO
155
                write_chunk(&gchunk);
156
#else
157
                /* 20 bytes are needed to put the chunk header info on the wire + attributes size + payload */
158
                ret = sendViaTcp(gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size);
159
#endif
160

    
161
                free(grapes_chunk_attributes_block);
162
                return ret;
163
        }
164
        return ret;
165
}
166

    
167
int sendViaTcp(Chunk gchunk, int buffer_size)
168
{
169
        uint8_t *buffer=NULL;
170

    
171
        int ret = STREAMER_FAIL_RETURN;
172
        
173
        if(!(tcp_fd > 0))
174
        {
175
                fprintf(stderr, "TCP IO-MODULE: trying to send data to a not connected socket!!!\n");
176
                return ret;
177
        }
178

    
179
        if( (buffer = malloc(buffer_size)) != NULL) {
180
                /* encode the GRAPES chunk into network bytes */
181
                encodeChunk(&gchunk, buffer, buffer_size);
182
                
183
                int ret = send(tcp_fd, buffer, buffer_size, 0);
184
                int tmp;
185
                while(ret != buffer_size)
186
                {
187
                        tmp = send(tcp_fd, buffer+ret, buffer_size-ret, 0);
188
                        if(tmp > 0)
189
                                ret += tmp;
190
                        else
191
                                break;
192
                }
193
                
194
                free(buffer);
195
        }
196
        return ret;
197
}