Revision 14ab542a chunker_streamer/chunk_pusher.c

View differences:

chunker_streamer/chunk_pusher.c
1 1
/*
2 2
 *  Copyright (c) 2009-2011 Carmelo Daniele, Dario Marchese, Diego Reforgiato, Giuseppe Tropea
3
 *  Copyright (c) 2010-2011 Csaba Kiraly
3 4
 *  developed for the Napa-Wine EU project. See www.napa-wine.eu
4 5
 *
5 6
 *  This is free software; see lgpl-2.1.txt
......
15 16
#include "external_chunk_transcoding.h"
16 17
#include "chunker_streamer.h"
17 18

  
18
//#define DEBUG_PUSHER
19

  
19
#include "chunk_pusher.h"
20 20

  
21
int pushChunkHttp(ExternalChunk *echunk, char *url);
22
int pushChunkTcp(ExternalChunk *echunk);
23
int sendViaCurl(Chunk gchunk, int buffer_size, char *url);
24
int sendViaTcp(Chunk gchunk, uint32_t buffer_size);
21
//#define DEBUG_PUSHER
25 22

  
26 23

  
27 24
extern ChunkerMetadata *cmeta;
28
static long long int counter = 0;
29
static int tcp_fd = -1;
30
static bool tcp_fd_connected = false;
31
static char* peer_ip;
32
static int peer_port;
25

  
26
struct output {
27
    char* peer_ip;
28
    int peer_port;
29
    int tcp_fd;
30
    bool tcp_fd_connected;
31
    long long int counter;
32
};
33 33
static bool exit_on_connect_failure = false;
34 34
static bool connect_on_data = true;
35 35
static bool exit_on_send_error = false;	//TODO: handle this on Mac
36 36

  
37
void initTCPPush(char* ip, int port)
38
{
39
	peer_ip = strdup(ip);
40
	peer_port = port;
41 37

  
42
	if(tcp_fd == -1)
38
void connectTCP(struct output *o)
39
{
40
	if(o->tcp_fd == -1)
43 41
	{
44
		tcp_fd=socket(AF_INET, SOCK_STREAM, 0);
42
		o->tcp_fd=socket(AF_INET, SOCK_STREAM, 0);
45 43
	}
46
	if (!tcp_fd_connected) {
44
	if (!o->tcp_fd_connected) {
47 45
		struct sockaddr_in address;
48 46
		address.sin_family = AF_INET; 
49
		address.sin_addr.s_addr = inet_addr(peer_ip);
50
		address.sin_port = htons(peer_port);
47
		address.sin_addr.s_addr = inet_addr(o->peer_ip);
48
		address.sin_port = htons(o->peer_port);
51 49
	 
52
		int result = connect(tcp_fd, (struct sockaddr *)&address, sizeof(struct sockaddr_in));
50
		int result = connect(o->tcp_fd, (struct sockaddr *)&address, sizeof(struct sockaddr_in));
53 51
		if(result == -1){
54
			fprintf(stderr, "TCP OUTPUT MODULE: could not connect to the peer!\n");
52
			fprintf(stderr, "TCP OUTPUT MODULE: could not connect to the peer %s:%d!\n", o->peer_ip, o->peer_port);
55 53
			if (exit_on_connect_failure) {
56 54
				exit(1);
57 55
			}
58
			tcp_fd_connected = false;
56
			o->tcp_fd_connected = false;
59 57
		} else {
60
			tcp_fd_connected = true;
58
			o->tcp_fd_connected = true;
61 59
		}
62 60
	}
63 61
}
64 62

  
65
void finalizeTCPChunkPusher()
63
struct output *initTCPPush(char* ip, int port)
64
{
65

  
66
	struct output *o = malloc(sizeof(struct output));
67
	if (!o) {
68
		fprintf(stderr, "PUSHER:memory alloc errir\n");
69
		return NULL;
70
	}
71

  
72
	o->peer_ip = strdup(ip);
73
	o->peer_port = port;
74
	o->tcp_fd = -1;
75
	o->tcp_fd_connected = false;
76
	o->counter = 0;
77

  
78
	connectTCP(o);
79

  
80
	return o;
81
}
82

  
83
void finalizeTCPChunkPusher(struct output *o)
66 84
{
67
	if(tcp_fd > 0)
85
	if(o->tcp_fd > 0)
68 86
	{
69
		close(tcp_fd);
70
		tcp_fd = -1;
87
		close(o->tcp_fd);
88
		o->tcp_fd = -1;
71 89
	}
72 90
}
73 91

  
74
int pushChunkHttp(ExternalChunk *echunk, char *url) {
92
int pushChunkHttp(struct output *o, ExternalChunk *echunk, char *url) {
75 93

  
76 94
	Chunk gchunk;
77 95
	void *grapes_chunk_attributes_block = NULL;
......
107 125
		}
108 126
		else if(cmeta->cid == 2) {
109 127
			//its ID is offset by actual time in seconds
110
			gchunk.id = ++counter + cmeta->base_chunkid_sequence_offset;
128
			gchunk.id = ++o->counter + cmeta->base_chunkid_sequence_offset;
111 129
#ifdef DEBUG_PUSHER
112 130
			fprintf(stderr, "PUSHER: packaged SEQ %d + %d offset chunkID %d\n", echunk->seq, cmeta->base_chunkid_sequence_offset, gchunk.id);
113 131
#endif
......
131 149
	return ret;
132 150
}
133 151

  
134
int pushChunkTcp(ExternalChunk *echunk)
152
int pushChunkTcp(struct output *o, ExternalChunk *echunk)
135 153
{
136 154
	Chunk gchunk;
137 155
	void *grapes_chunk_attributes_block = NULL;
......
140 158
	static size_t ExternalChunk_header_size = 5*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 1*CHUNK_TRANSCODING_INT_SIZE*2;
141 159
	
142 160
	//try to connect if not connected
143
	if (connect_on_data && !tcp_fd_connected) {
144
		initTCPPush(peer_ip, peer_port);
145
		if (!tcp_fd_connected) {
161
	if (connect_on_data && !o->tcp_fd_connected) {
162
		connectTCP(o);
163
		if (!o->tcp_fd_connected) {
146 164
			return ret;
147 165
		}
148 166
	}
......
175 193
		}
176 194
		else if(cmeta->cid == 2) {
177 195
			//its ID is offset by actual time in seconds
178
			gchunk.id = ++counter + cmeta->base_chunkid_sequence_offset;
196
			gchunk.id = ++o->counter + cmeta->base_chunkid_sequence_offset;
179 197
#ifdef DEBUG_PUSHER
180 198
			fprintf(stderr, "PUSHER: packaged SEQ %d + %d offset chunkID %d\n", echunk->seq, cmeta->base_chunkid_sequence_offset, gchunk.id);
181 199
#endif
......
188 206
		write_chunk(&gchunk);
189 207
#else
190 208
		/* 20 bytes are needed to put the chunk header info on the wire + attributes size + payload */
191
		ret = sendViaTcp(gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size);
209
		ret = sendViaTcp(o,gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size);
192 210
#endif
193 211

  
194 212
		free(grapes_chunk_attributes_block);
......
197 215
	return ret;
198 216
}
199 217

  
200
int sendViaTcp(Chunk gchunk, uint32_t buffer_size)
218
int sendViaTcp(struct output *o, Chunk gchunk, uint32_t buffer_size)
201 219
{
202 220
	uint8_t *buffer=NULL;
203 221

  
204 222
	int ret = STREAMER_FAIL_RETURN;
205 223
	
206
	if(!(tcp_fd > 0))
224
	if(!(o->tcp_fd > 0))
207 225
	{
208 226
		fprintf(stderr, "TCP IO-MODULE: trying to send data to a not connected socket!!!\n");
209 227
		return ret;
......
215 233
		*(uint32_t*)buffer = htonl(buffer_size);
216 234

  
217 235
#ifdef MSG_NOSIGNAL
218
		int ret = send(tcp_fd, buffer, 4 + buffer_size, exit_on_send_error ? 0 : MSG_NOSIGNAL); //TODO: better handling of exit_on_send_error
236
		int ret = send(o->tcp_fd, buffer, 4 + buffer_size, exit_on_send_error ? 0 : MSG_NOSIGNAL); //TODO: better handling of exit_on_send_error
219 237
#else
220
		int ret = send(tcp_fd, buffer, 4 + buffer_size, 0); //TODO: better handling of exit_on_send_error
238
		int ret = send(o->tcp_fd, buffer, 4 + buffer_size, 0); //TODO: better handling of exit_on_send_error
221 239
#endif
222 240
		//fprintf(stderr, "TCP IO-MODULE: sending %d bytes, %d sent\n", buffer_size, ret);
223 241
		if (ret < 0) {
224 242
			if (errno != EAGAIN && errno != EWOULDBLOCK) {
225 243
				fprintf(stderr, "TCP IO-MODULE: closing connection\n");
226
				close(tcp_fd);
227
				tcp_fd = -1;
228
				tcp_fd_connected = false;
244
				close(o->tcp_fd);
245
				o->tcp_fd = -1;
246
				o->tcp_fd_connected = false;
229 247
			}
230 248
			return ret;
231 249
		}
......
233 251
		while(ret != buffer_size)
234 252
		{
235 253
#ifdef MSG_NOSIGNAL
236
			tmp = send(tcp_fd, buffer+ret, 4 + buffer_size - ret, exit_on_send_error ? 0 : MSG_NOSIGNAL);
254
			tmp = send(o->tcp_fd, buffer+ret, 4 + buffer_size - ret, exit_on_send_error ? 0 : MSG_NOSIGNAL);
237 255
#else
238
			tmp = send(tcp_fd, buffer+ret, 4 + buffer_size - ret, 0);
256
			tmp = send(o->tcp_fd, buffer+ret, 4 + buffer_size - ret, 0);
239 257
#endif
240 258
			if(tmp > 0)
241 259
				ret += tmp;

Also available in: Unified diff