Statistics
| Branch: | Revision:

chunker-player / chunker_streamer / chunk_pusher_udp.c @ e9fe7473

History | View | Annotate | Download (3.73 KB)

1
/*
2
 *  Copyright (c) 2009-2011 Carmelo Daniele, Dario Marchese, Diego Reforgiato, Giuseppe Tropea
3
 *  developed for the Napa-Wine EU project. See www.napa-wine.eu
4
 *
5
 *  This is free software; see lgpl-2.1.txt
6
 */
7

    
8
#include <stdlib.h>
9
#include <string.h>
10
#include <sys/time.h>
11
#include <sys/socket.h>
12
#include <netinet/in.h>
13
#include <unistd.h>
14

    
15
#include "external_chunk_transcoding.h"
16
#include "chunker_streamer.h"
17

    
18
//#define DEBUG_PUSHER
19

    
20

    
21
int pushChunkHttp(ExternalChunk *echunk, char *url);
22
int pushChunkTcp(ExternalChunk *echunk);
23

    
24
extern ChunkerMetadata *cmeta;
25
static long long int counter = 0;
26
static int fd = -1;
27

    
28
void initUDPPush(char* peer_ip, int peer_port)
29
{
30
        if(fd == -1)
31
        {
32
                fd = socket(AF_INET, SOCK_DGRAM, 0);
33
        
34
                struct sockaddr_in address;
35
                address.sin_family = AF_INET; 
36
                address.sin_addr.s_addr = inet_addr(peer_ip);
37
                address.sin_port = htons(peer_port);
38
                 
39
                int result = connect(fd, (struct sockaddr *)&address, sizeof(struct sockaddr_in));
40
                if(result == -1){
41
                        fprintf(stderr, "UDP OUTPUT MODULE: could not connect to the peer!\n");
42
                        exit(1);
43
                }
44
        }
45
}
46

    
47
void finalizeUDPChunkPusher()
48
{
49
        if(fd > 0)
50
        {
51
                close(fd);
52
                fd = -1;
53
        }
54
}
55

    
56
int pushChunkUDP(ExternalChunk *echunk) {
57

    
58
        Chunk gchunk;
59
        void *grapes_chunk_attributes_block = NULL;
60
        int ret = STREAMER_FAIL_RETURN;
61
        //we need to pack 5 int32s + 2 timeval structs + 1 double
62
        static size_t ExternalChunk_header_size = 5*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 1*CHUNK_TRANSCODING_INT_SIZE*2;
63
        
64
        //update the chunk len here because here we know the external chunk header size
65
        echunk->len = echunk->payload_len + ExternalChunk_header_size;
66

    
67
        /* first pack the chunk info that we get from the streamer into an "attributes" block of a regular GRAPES chunk */
68
        if(        (grapes_chunk_attributes_block = packExternalChunkToAttributes(echunk, ExternalChunk_header_size)) != NULL ) {
69
                struct timeval now;
70

    
71
                /* then fill-up a proper GRAPES chunk */
72
                gchunk.size = echunk->payload_len;
73
                /* then fill the timestamp */
74
                gettimeofday(&now, NULL);
75
                gchunk.timestamp = now.tv_sec * 1000000ULL + now.tv_usec;
76

    
77
                //decide how to create the chunk ID
78
                if(cmeta->cid == 0) {
79
                        gchunk.id = echunk->seq;
80
#ifdef DEBUG_PUSHER
81
                        fprintf(stderr, "PUSHER: packaged SEQ chunkID %d\n", gchunk.id);
82
#endif
83
                }
84
                else if(cmeta->cid == 1) {
85
                        gchunk.id = gchunk.timestamp; //its ID is its start time
86
#ifdef DEBUG_PUSHER
87
                        fprintf(stderr, "PUSHER: packaged TS chunkID %d\n", gchunk.id);
88
#endif
89
                }
90
                else if(cmeta->cid == 2) {
91
                        //its ID is offset by actual time in seconds
92
                        gchunk.id = ++counter + cmeta->base_chunkid_sequence_offset;
93
#ifdef DEBUG_PUSHER
94
                        fprintf(stderr, "PUSHER: packaged SEQ %d + %d offset chunkID %d\n", echunk->seq, cmeta->base_chunkid_sequence_offset, gchunk.id);
95
#endif
96
                }
97
                gchunk.attributes = grapes_chunk_attributes_block;
98
                gchunk.attributes_size = ExternalChunk_header_size;
99
                gchunk.data = echunk->data;
100

    
101
                /* 20 bytes are needed to put the chunk header info on the wire + attributes size + payload */
102
                ret = sendViaUDP(gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size);
103

    
104
                free(grapes_chunk_attributes_block);
105
                return ret;
106
        }
107
        return ret;
108
}
109

    
110
int sendViaUDP(Chunk gchunk, int buffer_size)
111
{
112
        uint8_t *buffer=NULL;
113

    
114
        int ret = STREAMER_FAIL_RETURN;
115
        
116
        if(!(fd > 0))
117
        {
118
                fprintf(stderr, "IO-MODULE: trying to send data to a not connected socket!!!\n");
119
                return ret;
120
        }
121

    
122
        if( (buffer = malloc(buffer_size)) != NULL) {
123
                /* encode the GRAPES chunk into network bytes */
124
                encodeChunk(&gchunk, buffer, buffer_size);
125
                
126
                int ret = send(fd, buffer, buffer_size, 0);
127
                int tmp;
128
                while(ret != buffer_size)
129
                {
130
                        tmp = send(fd, buffer+ret, buffer_size-ret, 0);
131
                        if(tmp > 0)
132
                                ret += tmp;
133
                        else
134
                                break;
135
                }
136
                
137
                free(buffer);
138
        }
139
        return ret;
140
}