Revision e9fe7473

View differences:

chunker_streamer/Makefile
6 6

  
7 7
ifeq ($(IO), httpevent)
8 8
CPPFLAGS += -DHTTPIO
9
OBJECTS += chunk_pusher.o chunk_pusher_curl.o
9 10
endif
10 11

  
11 12
ifeq ($(IO), tcp)
12 13
CPPFLAGS += -DTCPIO
14
OBJECTS += chunk_pusher.o chunk_pusher_curl.o
15
endif
16

  
17
ifeq ($(IO), udp)
18
CPPFLAGS += -DUDPIO
19
OBJECTS += chunk_pusher_udp.o
13 20
endif
14 21

  
15 22
CPPFLAGS += -I$(LOCAL_CONFUSE)/include -I$(LOCAL_CURL)/include
......
24 31

  
25 32
all: chunker_streamer
26 33

  
27
chunker_streamer: ../chunk_transcoding/external_chunk_transcoding.o chunker_metadata.o chunker_streamer.o chunk_pusher.o chunk_pusher_curl.o
34
chunker_streamer: ../chunk_transcoding/external_chunk_transcoding.o chunker_metadata.o chunker_streamer.o $(OBJECTS)
28 35

  
29 36
clean:
30 37
	rm -f chunker_streamer
chunker_streamer/chunk_pusher_udp.c
1
/*
2
 *  Copyright (c) 2009-2011 Carmelo Daniele, Dario Marchese, Diego Reforgiato, Giuseppe Tropea
3
 *  developed for the Napa-Wine EU project. See www.napa-wine.eu
4
 *
5
 *  This is free software; see lgpl-2.1.txt
6
 */
7

  
8
#include <stdlib.h>
9
#include <string.h>
10
#include <sys/time.h>
11
#include <sys/socket.h>
12
#include <netinet/in.h>
13
#include <unistd.h>
14

  
15
#include "external_chunk_transcoding.h"
16
#include "chunker_streamer.h"
17

  
18
//#define DEBUG_PUSHER
19

  
20

  
21
int pushChunkHttp(ExternalChunk *echunk, char *url);
22
int pushChunkTcp(ExternalChunk *echunk);
23

  
24
extern ChunkerMetadata *cmeta;
25
static long long int counter = 0;
26
static int fd = -1;
27

  
28
void initUDPPush(char* peer_ip, int peer_port)
29
{
30
	if(fd == -1)
31
	{
32
		fd = socket(AF_INET, SOCK_DGRAM, 0);
33
	
34
		struct sockaddr_in address;
35
		address.sin_family = AF_INET; 
36
		address.sin_addr.s_addr = inet_addr(peer_ip);
37
		address.sin_port = htons(peer_port);
38
		 
39
		int result = connect(fd, (struct sockaddr *)&address, sizeof(struct sockaddr_in));
40
		if(result == -1){
41
			fprintf(stderr, "UDP OUTPUT MODULE: could not connect to the peer!\n");
42
			exit(1);
43
		}
44
	}
45
}
46

  
47
void finalizeUDPChunkPusher()
48
{
49
	if(fd > 0)
50
	{
51
		close(fd);
52
		fd = -1;
53
	}
54
}
55

  
56
int pushChunkUDP(ExternalChunk *echunk) {
57

  
58
	Chunk gchunk;
59
	void *grapes_chunk_attributes_block = NULL;
60
	int ret = STREAMER_FAIL_RETURN;
61
	//we need to pack 5 int32s + 2 timeval structs + 1 double
62
	static size_t ExternalChunk_header_size = 5*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 1*CHUNK_TRANSCODING_INT_SIZE*2;
63
	
64
	//update the chunk len here because here we know the external chunk header size
65
	echunk->len = echunk->payload_len + ExternalChunk_header_size;
66

  
67
	/* first pack the chunk info that we get from the streamer into an "attributes" block of a regular GRAPES chunk */
68
	if(	(grapes_chunk_attributes_block = packExternalChunkToAttributes(echunk, ExternalChunk_header_size)) != NULL ) {
69
		struct timeval now;
70

  
71
		/* then fill-up a proper GRAPES chunk */
72
		gchunk.size = echunk->payload_len;
73
		/* then fill the timestamp */
74
		gettimeofday(&now, NULL);
75
		gchunk.timestamp = now.tv_sec * 1000000ULL + now.tv_usec;
76

  
77
		//decide how to create the chunk ID
78
		if(cmeta->cid == 0) {
79
			gchunk.id = echunk->seq;
80
#ifdef DEBUG_PUSHER
81
			fprintf(stderr, "PUSHER: packaged SEQ chunkID %d\n", gchunk.id);
82
#endif
83
		}
84
		else if(cmeta->cid == 1) {
85
			gchunk.id = gchunk.timestamp; //its ID is its start time
86
#ifdef DEBUG_PUSHER
87
			fprintf(stderr, "PUSHER: packaged TS chunkID %d\n", gchunk.id);
88
#endif
89
		}
90
		else if(cmeta->cid == 2) {
91
			//its ID is offset by actual time in seconds
92
			gchunk.id = ++counter + cmeta->base_chunkid_sequence_offset;
93
#ifdef DEBUG_PUSHER
94
			fprintf(stderr, "PUSHER: packaged SEQ %d + %d offset chunkID %d\n", echunk->seq, cmeta->base_chunkid_sequence_offset, gchunk.id);
95
#endif
96
		}
97
		gchunk.attributes = grapes_chunk_attributes_block;
98
		gchunk.attributes_size = ExternalChunk_header_size;
99
		gchunk.data = echunk->data;
100

  
101
		/* 20 bytes are needed to put the chunk header info on the wire + attributes size + payload */
102
		ret = sendViaUDP(gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size);
103

  
104
		free(grapes_chunk_attributes_block);
105
		return ret;
106
	}
107
	return ret;
108
}
109

  
110
int sendViaUDP(Chunk gchunk, int buffer_size)
111
{
112
	uint8_t *buffer=NULL;
113

  
114
	int ret = STREAMER_FAIL_RETURN;
115
	
116
	if(!(fd > 0))
117
	{
118
		fprintf(stderr, "IO-MODULE: trying to send data to a not connected socket!!!\n");
119
		return ret;
120
	}
121

  
122
	if( (buffer = malloc(buffer_size)) != NULL) {
123
		/* encode the GRAPES chunk into network bytes */
124
		encodeChunk(&gchunk, buffer, buffer_size);
125
		
126
		int ret = send(fd, buffer, buffer_size, 0);
127
		int tmp;
128
		while(ret != buffer_size)
129
		{
130
			tmp = send(fd, buffer+ret, buffer_size-ret, 0);
131
			if(tmp > 0)
132
				ret += tmp;
133
			else
134
				break;
135
		}
136
		
137
		free(buffer);
138
	}
139
	return ret;
140
}

Also available in: Unified diff