Revision 1f9b07d5

View differences:

chunk_pusher_stdout.c
1
#include <stdlib.h>
2
#include <string.h>
3
#include <curl/curl.h>
4

  
5
#include "external_chunk_transcoding.h"
6
#include "chunker_streamer.h"
7

  
8
void initChunkPusher();
9
void finalizeChunkPusher();
10
int pushChunkHttp(ExternalChunk *echunk, char *url);
11
void *chunkToAttributes(ExternalChunk *echunk, size_t attr_size);
12
static inline void bit32_encoded_push(uint32_t v, uint8_t *p);
13
void chunker_logger(const char *s);
14

  
15

  
16
void initChunkPusher() {	
17
	/* In windows, this will init the winsock stuff */ 
18
	curl_global_init(CURL_GLOBAL_ALL);
19
}
20

  
21
void finalizeChunkPusher() {
22
	curl_global_cleanup();
23
}
24

  
25
int pushChunkHttp(ExternalChunk *echunk, char *url) {
26
	//MAKE THE CURL EASY HANDLE GLOBAL? TO REUSE IT?
27
	CURL *curl_handle;
28
	struct curl_slist *headers=NULL;
29
	void *grapes_chunk_attributes_block=NULL;
30
	uint8_t *buffer=NULL;
31
	static size_t attr_size = 0;
32
	size_t buffer_size = 0;
33
	double ts1e3 = 0.0;
34
	double tus3e1 = 0.0;
35
	int ret = STREAMER_FAIL_RETURN;
36
	
37
	attr_size = 5*sizeof(int) + 2*sizeof(time_t) + 2*sizeof(suseconds_t) + 1*sizeof(double);
38

  
39
	/* first pack the chunk info that we get from the streamer into an "attributes" block of a regular GRAPES chunk */
40
	if(	(grapes_chunk_attributes_block = packExternalChunkToAttributes(echunk, attr_size)) != NULL ) {
41
		/* then fill-up a proper GRAPES chunk */
42
		Chunk gchunk;
43
		gchunk.id = echunk->seq;
44
		gchunk.size = echunk->payload_len;
45
		/* convert external_chunk start_time in milliseconds and put it in grapes chunk */
46
		ts1e3 = (double)(echunk->start_time.tv_sec*1e3);
47
		tus3e1 = (double)(echunk->start_time.tv_usec/1e3);
48
		gchunk.timestamp = ts1e3+tus3e1;
49
		gchunk.attributes = grapes_chunk_attributes_block;
50
		gchunk.attributes_size = attr_size;
51
		gchunk.data = echunk->data;
52

  
53
		/* 20 bytes are needed to put the chunk header info on the wire */
54
		buffer_size = GRAPES_ENCODED_CHUNK_HEADER_SIZE + attr_size + echunk->payload_len;
55
		if( (buffer = malloc(buffer_size)) != NULL) {
56
			/* encode the GRAPES chunk into network bytes */
57
			int32_t encoded_size = encodeChunk(&gchunk, buffer, buffer_size);
58
			char * str = "chunk";
59
fprintf(stderr, "Writing chunk of size %d\n", encoded_size);
60
			write(1,str, 5);
61
			write(1,&encoded_size, sizeof(encoded_size));
62
			fprintf(stderr, " written %d\n", write(1,buffer, encoded_size));	//todo check for errors
63
			free(buffer);
64
			free(grapes_chunk_attributes_block);
65
			return ret;
66
		}
67
		free(grapes_chunk_attributes_block);
68
		return ret;
69
	}
70
	return ret;
71
}
72

  

Also available in: Unified diff