Revision 14ab542a

View differences:

chunker_streamer/chunk_pusher.c
1 1
/*
2 2
 *  Copyright (c) 2009-2011 Carmelo Daniele, Dario Marchese, Diego Reforgiato, Giuseppe Tropea
3
 *  Copyright (c) 2010-2011 Csaba Kiraly
3 4
 *  developed for the Napa-Wine EU project. See www.napa-wine.eu
4 5
 *
5 6
 *  This is free software; see lgpl-2.1.txt
......
15 16
#include "external_chunk_transcoding.h"
16 17
#include "chunker_streamer.h"
17 18

  
18
//#define DEBUG_PUSHER
19

  
19
#include "chunk_pusher.h"
20 20

  
21
int pushChunkHttp(ExternalChunk *echunk, char *url);
22
int pushChunkTcp(ExternalChunk *echunk);
23
int sendViaCurl(Chunk gchunk, int buffer_size, char *url);
24
int sendViaTcp(Chunk gchunk, uint32_t buffer_size);
21
//#define DEBUG_PUSHER
25 22

  
26 23

  
27 24
extern ChunkerMetadata *cmeta;
28
static long long int counter = 0;
29
static int tcp_fd = -1;
30
static bool tcp_fd_connected = false;
31
static char* peer_ip;
32
static int peer_port;
25

  
26
struct output {
27
    char* peer_ip;
28
    int peer_port;
29
    int tcp_fd;
30
    bool tcp_fd_connected;
31
    long long int counter;
32
};
33 33
static bool exit_on_connect_failure = false;
34 34
static bool connect_on_data = true;
35 35
static bool exit_on_send_error = false;	//TODO: handle this on Mac
36 36

  
37
void initTCPPush(char* ip, int port)
38
{
39
	peer_ip = strdup(ip);
40
	peer_port = port;
41 37

  
42
	if(tcp_fd == -1)
38
void connectTCP(struct output *o)
39
{
40
	if(o->tcp_fd == -1)
43 41
	{
44
		tcp_fd=socket(AF_INET, SOCK_STREAM, 0);
42
		o->tcp_fd=socket(AF_INET, SOCK_STREAM, 0);
45 43
	}
46
	if (!tcp_fd_connected) {
44
	if (!o->tcp_fd_connected) {
47 45
		struct sockaddr_in address;
48 46
		address.sin_family = AF_INET; 
49
		address.sin_addr.s_addr = inet_addr(peer_ip);
50
		address.sin_port = htons(peer_port);
47
		address.sin_addr.s_addr = inet_addr(o->peer_ip);
48
		address.sin_port = htons(o->peer_port);
51 49
	 
52
		int result = connect(tcp_fd, (struct sockaddr *)&address, sizeof(struct sockaddr_in));
50
		int result = connect(o->tcp_fd, (struct sockaddr *)&address, sizeof(struct sockaddr_in));
53 51
		if(result == -1){
54
			fprintf(stderr, "TCP OUTPUT MODULE: could not connect to the peer!\n");
52
			fprintf(stderr, "TCP OUTPUT MODULE: could not connect to the peer %s:%d!\n", o->peer_ip, o->peer_port);
55 53
			if (exit_on_connect_failure) {
56 54
				exit(1);
57 55
			}
58
			tcp_fd_connected = false;
56
			o->tcp_fd_connected = false;
59 57
		} else {
60
			tcp_fd_connected = true;
58
			o->tcp_fd_connected = true;
61 59
		}
62 60
	}
63 61
}
64 62

  
65
void finalizeTCPChunkPusher()
63
struct output *initTCPPush(char* ip, int port)
64
{
65

  
66
	struct output *o = malloc(sizeof(struct output));
67
	if (!o) {
68
		fprintf(stderr, "PUSHER:memory alloc errir\n");
69
		return NULL;
70
	}
71

  
72
	o->peer_ip = strdup(ip);
73
	o->peer_port = port;
74
	o->tcp_fd = -1;
75
	o->tcp_fd_connected = false;
76
	o->counter = 0;
77

  
78
	connectTCP(o);
79

  
80
	return o;
81
}
82

  
83
void finalizeTCPChunkPusher(struct output *o)
66 84
{
67
	if(tcp_fd > 0)
85
	if(o->tcp_fd > 0)
68 86
	{
69
		close(tcp_fd);
70
		tcp_fd = -1;
87
		close(o->tcp_fd);
88
		o->tcp_fd = -1;
71 89
	}
72 90
}
73 91

  
74
int pushChunkHttp(ExternalChunk *echunk, char *url) {
92
int pushChunkHttp(struct output *o, ExternalChunk *echunk, char *url) {
75 93

  
76 94
	Chunk gchunk;
77 95
	void *grapes_chunk_attributes_block = NULL;
......
107 125
		}
108 126
		else if(cmeta->cid == 2) {
109 127
			//its ID is offset by actual time in seconds
110
			gchunk.id = ++counter + cmeta->base_chunkid_sequence_offset;
128
			gchunk.id = ++o->counter + cmeta->base_chunkid_sequence_offset;
111 129
#ifdef DEBUG_PUSHER
112 130
			fprintf(stderr, "PUSHER: packaged SEQ %d + %d offset chunkID %d\n", echunk->seq, cmeta->base_chunkid_sequence_offset, gchunk.id);
113 131
#endif
......
131 149
	return ret;
132 150
}
133 151

  
134
int pushChunkTcp(ExternalChunk *echunk)
152
int pushChunkTcp(struct output *o, ExternalChunk *echunk)
135 153
{
136 154
	Chunk gchunk;
137 155
	void *grapes_chunk_attributes_block = NULL;
......
140 158
	static size_t ExternalChunk_header_size = 5*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 1*CHUNK_TRANSCODING_INT_SIZE*2;
141 159
	
142 160
	//try to connect if not connected
143
	if (connect_on_data && !tcp_fd_connected) {
144
		initTCPPush(peer_ip, peer_port);
145
		if (!tcp_fd_connected) {
161
	if (connect_on_data && !o->tcp_fd_connected) {
162
		connectTCP(o);
163
		if (!o->tcp_fd_connected) {
146 164
			return ret;
147 165
		}
148 166
	}
......
175 193
		}
176 194
		else if(cmeta->cid == 2) {
177 195
			//its ID is offset by actual time in seconds
178
			gchunk.id = ++counter + cmeta->base_chunkid_sequence_offset;
196
			gchunk.id = ++o->counter + cmeta->base_chunkid_sequence_offset;
179 197
#ifdef DEBUG_PUSHER
180 198
			fprintf(stderr, "PUSHER: packaged SEQ %d + %d offset chunkID %d\n", echunk->seq, cmeta->base_chunkid_sequence_offset, gchunk.id);
181 199
#endif
......
188 206
		write_chunk(&gchunk);
189 207
#else
190 208
		/* 20 bytes are needed to put the chunk header info on the wire + attributes size + payload */
191
		ret = sendViaTcp(gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size);
209
		ret = sendViaTcp(o,gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size);
192 210
#endif
193 211

  
194 212
		free(grapes_chunk_attributes_block);
......
197 215
	return ret;
198 216
}
199 217

  
200
int sendViaTcp(Chunk gchunk, uint32_t buffer_size)
218
int sendViaTcp(struct output *o, Chunk gchunk, uint32_t buffer_size)
201 219
{
202 220
	uint8_t *buffer=NULL;
203 221

  
204 222
	int ret = STREAMER_FAIL_RETURN;
205 223
	
206
	if(!(tcp_fd > 0))
224
	if(!(o->tcp_fd > 0))
207 225
	{
208 226
		fprintf(stderr, "TCP IO-MODULE: trying to send data to a not connected socket!!!\n");
209 227
		return ret;
......
215 233
		*(uint32_t*)buffer = htonl(buffer_size);
216 234

  
217 235
#ifdef MSG_NOSIGNAL
218
		int ret = send(tcp_fd, buffer, 4 + buffer_size, exit_on_send_error ? 0 : MSG_NOSIGNAL); //TODO: better handling of exit_on_send_error
236
		int ret = send(o->tcp_fd, buffer, 4 + buffer_size, exit_on_send_error ? 0 : MSG_NOSIGNAL); //TODO: better handling of exit_on_send_error
219 237
#else
220
		int ret = send(tcp_fd, buffer, 4 + buffer_size, 0); //TODO: better handling of exit_on_send_error
238
		int ret = send(o->tcp_fd, buffer, 4 + buffer_size, 0); //TODO: better handling of exit_on_send_error
221 239
#endif
222 240
		//fprintf(stderr, "TCP IO-MODULE: sending %d bytes, %d sent\n", buffer_size, ret);
223 241
		if (ret < 0) {
224 242
			if (errno != EAGAIN && errno != EWOULDBLOCK) {
225 243
				fprintf(stderr, "TCP IO-MODULE: closing connection\n");
226
				close(tcp_fd);
227
				tcp_fd = -1;
228
				tcp_fd_connected = false;
244
				close(o->tcp_fd);
245
				o->tcp_fd = -1;
246
				o->tcp_fd_connected = false;
229 247
			}
230 248
			return ret;
231 249
		}
......
233 251
		while(ret != buffer_size)
234 252
		{
235 253
#ifdef MSG_NOSIGNAL
236
			tmp = send(tcp_fd, buffer+ret, 4 + buffer_size - ret, exit_on_send_error ? 0 : MSG_NOSIGNAL);
254
			tmp = send(o->tcp_fd, buffer+ret, 4 + buffer_size - ret, exit_on_send_error ? 0 : MSG_NOSIGNAL);
237 255
#else
238
			tmp = send(tcp_fd, buffer+ret, 4 + buffer_size - ret, 0);
256
			tmp = send(o->tcp_fd, buffer+ret, 4 + buffer_size - ret, 0);
239 257
#endif
240 258
			if(tmp > 0)
241 259
				ret += tmp;
chunker_streamer/chunk_pusher.h
1
/*
2
 *  Copyright (c) 2009-2011 Carmelo Daniele, Dario Marchese, Diego Reforgiato, Giuseppe Tropea
3
 *  Copyright (c) 2010-2011 Csaba Kiraly
4
 *  developed for the Napa-Wine EU project. See www.napa-wine.eu
5
 *
6
 *  This is free software; see lgpl-2.1.txt
7
 */
8

  
9
#ifndef CHUNK_PUSHER_H
10
#define CHUNK_PUSHER_H
11

  
12
struct output;
13

  
14
struct output *initTCPPush(char* ip, int port);
15
void finalizeTCPChunkPusher(struct output *o);
16
int pushChunkTcp(struct output *o, ExternalChunk *echunk);
17

  
18
#endif
chunker_streamer/chunker_streamer.c
1 1
/*
2 2
 *  Copyright (c) 2009-2011 Carmelo Daniele, Dario Marchese, Diego Reforgiato, Giuseppe Tropea
3
 *  Copyright (c) 2010-2011 Csaba Kiraly
3 4
 *  developed for the Napa-Wine EU project. See www.napa-wine.eu
4 5
 *
5 6
 *  This is free software; see lgpl-2.1.txt
......
17 18
#include "chunker_filtering.h"
18 19
#endif
19 20

  
21
#include "chunk_pusher.h"
22

  
23
static struct output* output;
24

  
20 25
#define DEBUG
21 26
#define DEBUG_AUDIO_FRAMES  false
22 27
#define DEBUG_VIDEO_FRAMES  false
......
39 44

  
40 45
void SaveFrame(AVFrame *pFrame, int width, int height);
41 46
void SaveEncodedFrame(Frame* frame, uint8_t *video_outbuf);
42
int pushChunkTcp(ExternalChunk *echunk);
43
void initTCPPush(char* ip, int port);
44 47
int update_chunk(ExternalChunk *chunk, Frame *frame, uint8_t *outbuf);
45
void finalizeTCPChunkPusher();
46 48
void bit32_encoded_push(uint32_t v, uint8_t *p);
47 49

  
48 50
int video_record_count = 0;
......
148 150
    );
149 151
  }
150 152

  
151
int sendChunk(ExternalChunk *chunk) {
153
int sendChunk(struct output *output, ExternalChunk *chunk) {
152 154
#ifdef HTTPIO
153 155
						return pushChunkHttp(chunk, outside_world_url);
154 156
#endif
155 157
#ifdef TCPIO
156
						return pushChunkTcp(chunk);
158
						return pushChunkTcp(output, chunk);
157 159
#endif
158 160
#ifdef UDPIO
159 161
						return pushChunkUDP(chunk);
......
652 654
		return -2;
653 655
	}
654 656
	
655
	initTCPPush(peer_ip, peer_port);
657
	output = initTCPPush(peer_ip, peer_port);
658
	if (!output) {
659
		fprintf(stderr, "Error initializing output module, exiting\n");
660
		exit(1);
661
	}
656 662
#endif
657 663
#ifdef UDPIO
658 664
	static char peer_ip[16];
......
969 975
						//SAVE ON FILE
970 976
						//saveChunkOnFile(chunk);
971 977
						//Send the chunk to an external transport/player
972
						sendChunk(chunk);
978
						sendChunk(output, chunk);
973 979
						dctprintf(DEBUG_CHUNKER, "VIDEO: sent chunk video %d, prio:%f, size %d\n", chunk->seq, chunk->priority, chunk->len);
974 980
						chunk->seq = 0; //signal that we need an increase
975 981
						//initChunk(chunk, &seq_current_chunk);
......
1135 1141
					//SAVE ON FILE
1136 1142
					//saveChunkOnFile(chunkaudio);
1137 1143
					//Send the chunk to an external transport/player
1138
					sendChunk(chunkaudio);
1144
					sendChunk(output, chunkaudio);
1139 1145
					dctprintf(DEBUG_CHUNKER, "AUDIO: just sent chunk audio %d\n", chunkaudio->seq);
1140 1146
					chunkaudio->seq = 0; //signal that we need an increase
1141 1147
					//initChunk(chunkaudio, &seq_current_chunk);
......
1190 1196
		//SAVE ON FILE
1191 1197
		//saveChunkOnFile(chunk);
1192 1198
		//Send the chunk to an external transport/player
1193
		sendChunk(chunk);
1199
		sendChunk(output, chunk);
1194 1200
		dcprintf(DEBUG_CHUNKER, "CHUNKER: SENDING LAST VIDEO CHUNK\n");
1195 1201
		chunk->seq = 0; //signal that we need an increase just in case we will restart
1196 1202
	}
......
1198 1204
		//SAVE ON FILE     
1199 1205
		//saveChunkOnFile(chunkaudio);
1200 1206
		//Send the chunk via http to an external transport/player
1201
		sendChunk(chunkaudio);
1207
		sendChunk(output, chunkaudio);
1202 1208
		dcprintf(DEBUG_CHUNKER, "CHUNKER: SENDING LAST AUDIO CHUNK\n");
1203 1209
		chunkaudio->seq = 0; //signal that we need an increase just in case we will restart
1204 1210
	}
......
1277 1283
	}
1278 1284

  
1279 1285
#ifdef TCPIO
1280
	finalizeTCPChunkPusher();
1286
	finalizeTCPChunkPusher(output);
1281 1287
#endif
1282 1288

  
1283 1289
#ifdef USE_AVFILTER

Also available in: Unified diff