Revision e11386c0 chunker_streamer/chunk_pusher.c

View differences:

chunker_streamer/chunk_pusher.c
1 1
#include <stdlib.h>
2 2
#include <string.h>
3
#include <sys/time.h>
4
#include <sys/socket.h>
5
#include <netinet/in.h>
6
#include <unistd.h>
3 7

  
4 8
#include "external_chunk_transcoding.h"
5 9
#include "chunker_streamer.h"
......
8 12

  
9 13

  
10 14
int pushChunkHttp(ExternalChunk *echunk, char *url);
15
int pushChunkTcp(ExternalChunk *echunk);
11 16

  
12 17
extern ChunkerMetadata *cmeta;
18
static long long int counter = 0;
19
static int tcp_fd = -1;
13 20

  
21
void initTCPPush(char* peer_ip, int peer_port)
22
{
23
	if(tcp_fd == -1)
24
	{
25
		tcp_fd=socket(AF_INET, SOCK_STREAM, 0);
26
	
27
		struct sockaddr_in address;
28
		address.sin_family = AF_INET; 
29
		address.sin_addr.s_addr = inet_addr(peer_ip);
30
		address.sin_port = htons(peer_port);
31
		 
32
		int result = connect(tcp_fd, (struct sockaddr *)&address, sizeof(struct sockaddr_in));
33
		if(result == -1){
34
			fprintf(stderr, "TCP OUTPUT MODULE: could not connect to the peer!\n");
35
			exit(1);
36
		}
37
	}
38
}
39

  
40
void finalizeTCPChunkPusher()
41
{
42
	if(tcp_fd > 0)
43
	{
44
		close(tcp_fd);
45
		tcp_fd = -1;
46
	}
47
}
14 48

  
15 49
int pushChunkHttp(ExternalChunk *echunk, char *url) {
16 50

  
......
48 82
		}
49 83
		else if(cmeta->cid == 2) {
50 84
			//its ID is offset by actual time in seconds
51
			gchunk.id = echunk->seq + cmeta->base_chunkid_sequence_offset;
85
			gchunk.id = ++counter + cmeta->base_chunkid_sequence_offset;
52 86
#ifdef DEBUG_PUSHER
53 87
			fprintf(stderr, "PUSHER: packaged SEQ %d + %d offset chunkID %d\n", echunk->seq, cmeta->base_chunkid_sequence_offset, gchunk.id);
54 88
#endif
......
62 96
#else
63 97
		/* 20 bytes are needed to put the chunk header info on the wire + attributes size + payload */
64 98
		ret = sendViaCurl(gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size, url);
99
		//~ if(ChunkerStreamerTestMode)
100
			//~ ret = sendViaCurl(gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size, "http://localhost:5557/externalplayer");
101
#endif
102

  
103
		free(grapes_chunk_attributes_block);
104
		return ret;
105
	}
106
	return ret;
107
}
108

  
109
int pushChunkTcp(ExternalChunk *echunk)
110
{
111
	Chunk gchunk;
112
	void *grapes_chunk_attributes_block = NULL;
113
	int ret = STREAMER_FAIL_RETURN;
114
	//we need to pack 5 int32s + 2 timeval structs + 1 double
115
	static size_t ExternalChunk_header_size = 5*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 1*CHUNK_TRANSCODING_INT_SIZE*2;
116
	
117
	//update the chunk len here because here we know the external chunk header size
118
	echunk->len = echunk->payload_len + ExternalChunk_header_size;
119

  
120
	/* first pack the chunk info that we get from the streamer into an "attributes" block of a regular GRAPES chunk */
121
	if(	(grapes_chunk_attributes_block = packExternalChunkToAttributes(echunk, ExternalChunk_header_size)) != NULL ) {
122
		struct timeval now;
123

  
124
		/* then fill-up a proper GRAPES chunk */
125
		gchunk.size = echunk->payload_len;
126
		/* then fill the timestamp */
127
		gettimeofday(&now, NULL);
128
		gchunk.timestamp = now.tv_sec * 1000000ULL + now.tv_usec;
129

  
130
		//decide how to create the chunk ID
131
		if(cmeta->cid == 0) {
132
			gchunk.id = echunk->seq;
133
#ifdef DEBUG_PUSHER
134
			fprintf(stderr, "PUSHER: packaged SEQ chunkID %d\n", gchunk.id);
135
#endif
136
		}
137
		else if(cmeta->cid == 1) {
138
			gchunk.id = gchunk.timestamp; //its ID is its start time
139
#ifdef DEBUG_PUSHER
140
			fprintf(stderr, "PUSHER: packaged TS chunkID %d\n", gchunk.id);
141
#endif
142
		}
143
		else if(cmeta->cid == 2) {
144
			//its ID is offset by actual time in seconds
145
			gchunk.id = ++counter + cmeta->base_chunkid_sequence_offset;
146
#ifdef DEBUG_PUSHER
147
			fprintf(stderr, "PUSHER: packaged SEQ %d + %d offset chunkID %d\n", echunk->seq, cmeta->base_chunkid_sequence_offset, gchunk.id);
148
#endif
149
		}
150
		gchunk.attributes = grapes_chunk_attributes_block;
151
		gchunk.attributes_size = ExternalChunk_header_size;
152
		gchunk.data = echunk->data;
153

  
154
#ifdef NHIO
155
		write_chunk(&gchunk);
156
#else
157
		/* 20 bytes are needed to put the chunk header info on the wire + attributes size + payload */
158
		ret = sendViaTcp(gchunk, GRAPES_ENCODED_CHUNK_HEADER_SIZE + gchunk.attributes_size + gchunk.size);
65 159
#endif
66 160

  
67 161
		free(grapes_chunk_attributes_block);
......
69 163
	}
70 164
	return ret;
71 165
}
166

  
167
int sendViaTcp(Chunk gchunk, int buffer_size)
168
{
169
	uint8_t *buffer=NULL;
170

  
171
	int ret = STREAMER_FAIL_RETURN;
172
	
173
	if(!(tcp_fd > 0))
174
	{
175
		fprintf(stderr, "TCP IO-MODULE: trying to send data to a not connected socket!!!\n");
176
		return ret;
177
	}
178

  
179
	if( (buffer = malloc(buffer_size)) != NULL) {
180
		/* encode the GRAPES chunk into network bytes */
181
		encodeChunk(&gchunk, buffer, buffer_size);
182
		
183
		int ret = send(tcp_fd, buffer, buffer_size, 0);
184
		int tmp;
185
		while(ret != buffer_size)
186
		{
187
			tmp = send(tcp_fd, buffer+ret, buffer_size-ret, 0);
188
			if(tmp > 0)
189
				ret += tmp;
190
			else
191
				break;
192
		}
193
		
194
		free(buffer);
195
	}
196
	return ret;
197
}

Also available in: Unified diff