Revision 2565bc8b

View differences:

Makefile
19 19

  
20 20
NAPA ?= ../../NAPA-BASELIBS
21 21
GRAPES ?= ../../GRAPES
22
ULPLAYER ?= ../StreamerPlayerChunker
23
ULPLAYER_EXTERNAL_LIBS ?= external_libs
24 22

  
25 23
CPPFLAGS = -I$(NAPA)/include
26 24
CPPFLAGS += -I$(GRAPES)/include
......
116 114
endif
117 115
endif
118 116
endif
119
ifeq ($(IO), httpmhd)
120
CPPFLAGS += -DHTTPIO_MHD
121
CPPFLAGS += -DHTTPIO
122
OBJS += $(ULPLAYER)/chunker_player/chunk_puller.o
123
OBJS += $(ULPLAYER)/chunker_streamer/chunk_pusher_curl.o
124
CPPFLAGS += -DCRAP -I$(ULPLAYER) -I$(ULPLAYER)/chunk_transcoding
125
CFLAGS += -pthread
126
LDFLAGS += -pthread
127
OBJS += input-http.o
128
OBJS += output-http.o output.o
129

  
130
CPPFLAGS += -I$(LOCAL_MHD)/include
131
LDFLAGS += -L$(LOCAL_MHD)/lib
132
LDLIBS += $(LOCAL_MHD)/lib/libmicrohttpd.a
133

  
134
CPPFLAGS += -I$(LOCAL_CURL)/include
135
LDFLAGS += -L$(LOCAL_CURL)/lib
136
LDLIBS += $(LOCAL_CURL)/lib/libcurl.a -lrt
137
endif
138
ifeq ($(IO), httpevent)
139
CPPFLAGS += -DHTTPIO_EVENT
140
CPPFLAGS += -DHTTPIO
141
OBJS += $(ULPLAYER)/event_http/event_http_server.o
142
LDFLAGS += -L$(NAPA)/dclog
143
LDLIBS += -ldclog
144
OBJS += $(ULPLAYER)/chunker_streamer/chunk_pusher_curl.o
145
CPPFLAGS += -I$(ULPLAYER) -I$(ULPLAYER)/chunk_transcoding -I$(ULPLAYER)/event_http -DCRAP
146
OBJS += input-http.o
147
OBJS += output-http.o output.o
148

  
149
CPPFLAGS += -I$(LOCAL_CURL)/include
150
LDFLAGS += -L$(LOCAL_CURL)/lib
151
LDLIBS += $(LOCAL_CURL)/lib/libcurl.a -lrt
152
endif
153 117

  
154 118
EXECTARGET = streamer
155 119
ifdef ML
......
198 162
$(EXECTARGET).o: streamer.o
199 163
	ln -sf streamer.o $(EXECTARGET).o
200 164

  
201
out-stream-avf.o Chunkiser/input-stream-avs.o: CPPFLAGS += -I$(FFMPEG_DIR)/include 
202

  
203 165
GRAPES:
204 166
	git clone http://www.disi.unitn.it/~kiraly/PublicGits/GRAPES.git
205 167
	cd GRAPES; git checkout -b for-streamer-0.8.3 origin/for-streamer-0.8.3
......
221 183
	rm -f $(GRAPES)/src/net_helper.o
222 184
	rm -f *.o
223 185
	rm -f Chunkiser/*.o
224
	rm -f $(ULPLAYER)/chunker_player/chunk_puller.o
225
	rm -f $(ULPLAYER)/chunker_streamer/chunk_pusher_curl.o
226
	rm -f $(ULPLAYER)/event_http/event_http_server.o
input-http.c
1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#include <stdio.h>
7
#include <unistd.h>
8
#include <stdarg.h>
9
#include <sys/types.h>
10
#include <sys/socket.h>
11
#include <sys/select.h>
12
#include <arpa/inet.h>
13

  
14
#ifdef HTTPIO_MHD
15
	#include <microhttpd.h>
16
	#include <pthread.h>
17
#elif defined HTTPIO_EVENT
18
	#include "chunk_external_interface.h"
19
#endif
20

  
21
#include <chunk.h>
22
#include <http_default_urls.h>
23
#include <external_chunk_transcoding.h>
24

  
25
#include "input.h"
26

  
27
extern struct chunk_buffer *cb;
28
extern int multiply;
29

  
30
#ifdef HTTPIO_MHD
31
	#ifdef THREADS
32
	extern pthread_mutex_t cb_mutex;
33
	extern pthread_mutex_t topology_mutex;
34
	#else
35
	pthread_mutex_t cb_mutex = PTHREAD_MUTEX_INITIALIZER;
36
	#endif
37
	struct MHD_Daemon *httpd;
38
#endif
39

  
40
struct input_desc {
41
  int dummy;
42
};
43

  
44

  
45
//this is the real one, called by the httpd receiver thread
46
int enqueueBlock(const uint8_t *block, const int block_size) {
47
	static int ExternalChunk_header_size = 5*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 1*CHUNK_TRANSCODING_INT_SIZE*2;
48
  int decoded_size = 0;
49
	int res = -1;
50
//  struct chunk gchunk;
51

  
52
  Chunk* gchunk=NULL;
53
	gchunk = (Chunk *)malloc(sizeof(Chunk));
54
	if(!gchunk) {
55
		fprintf(stderr, "Memory error in gchunk!\n");
56
		return -1;
57
	}
58

  
59
  decoded_size = decodeChunk(gchunk, block, block_size);
60

  
61
  if(decoded_size < 0 || decoded_size != GRAPES_ENCODED_CHUNK_HEADER_SIZE + ExternalChunk_header_size + gchunk->size) {
62
	    fprintf(stderr, "chunk %d probably corrupted!\n", gchunk->id);
63
		return -1;
64
	}
65

  
66
	if (gchunk->attributes) {
67
		free(gchunk->attributes);
68
		gchunk->attributes = NULL;
69
		gchunk->attributes_size = 0;
70
	}
71
	chunk_attributes_fill(gchunk);
72

  
73
	if(cb) {
74
		int cnt = 0;
75
		int i = 0;
76
#ifdef HTTPIO_MHD
77
		#ifdef THREADS
78
		//in case of threaded streamer it also has a topology mutex
79
		pthread_mutex_lock(&topology_mutex);
80
		#endif
81
		pthread_mutex_lock(&cb_mutex);
82
#endif
83
		res = add_chunk(gchunk);
84
		// if(res)
85
		for(i=0; i < multiply; i++) {	// @TODO: why this cycle?
86
			send_chunk();
87
		}
88
		send_offer();
89
		if (cnt++ % 10 == 0) {
90
			update_peers(NULL, NULL, 0);
91
		}
92
#ifdef HTTPIO_MHD
93
		pthread_mutex_unlock(&cb_mutex);
94
		#ifdef THREADS
95
  	pthread_mutex_unlock(&topology_mutex);
96
		#endif
97
#endif
98
	}
99

  
100
  return 0;
101
}
102

  
103
struct input_desc *input_open(const char *fname, int *fds, int fds_size)
104
{
105
  struct input_desc *res;
106

  
107
  if (fds_size >= 1) {
108
    *fds = -1; //This input module needs no fds to monitor
109
  }
110

  
111
  res = malloc(sizeof(struct input_desc));
112
  if (res == NULL) {
113
    return NULL;
114
  }
115

  
116
  res->dummy = 0;
117

  
118
#ifdef HTTPIO_MHD
119
	#ifndef THREADS
120
	//in case we are using the non-threaded version of streamer
121
	//we need our own mutex
122
  pthread_mutex_init(&cb_mutex, NULL);
123
	#endif
124
  //this daemon will listen the network for incoming chunks from a streaming source
125
  //on the following path and port
126
  httpd = initChunkPuller(UL_DEFAULT_CHUNKBUFFER_PATH, UL_DEFAULT_CHUNKBUFFER_PORT);
127
	printf("MHD input httpd thread initialized! %d\n", res->dummy);
128
#elif defined HTTPIO_EVENT
129
	if(ulEventHttpServerSetup("127.0.0.1", UL_DEFAULT_CHUNKBUFFER_PORT, &enqueueBlock)) {
130
		return NULL;
131
	}
132
	else {
133
		printf("EVENT input httpd loop initialized! %d\n", res->dummy);
134
	}
135
#endif
136

  
137
  return res;
138
}
139

  
140
void input_close(struct input_desc *s)
141
{
142
  free(s);
143
#ifdef HTTPIO_MHD
144
  finalizeChunkPuller(httpd);
145
	#ifndef THREADS
146
  pthread_mutex_destroy(&cb_mutex);
147
	#endif
148
#elif defined HTTPIO_EVENT
149
	//TODO finalize the event http server
150
#endif
151
}
152

  
153
//this one is not used, just routinely called by the firging thread
154
int input_get(struct input_desc *s, struct chunk *c)
155
{
156
  c->data = NULL;
157
  return 0;
158
}
output-http.c
1
	/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#include <stdio.h>
7
#include <stdint.h>
8
#include <stdlib.h>
9
#include <stdbool.h>
10
#include <sys/time.h>
11
#include <chunk.h>
12
#include <net_helper.h>
13

  
14
#include "http_default_urls.h"
15
#include "external_chunk_transcoding.h"
16

  
17
#include "output.h"
18
#include "dbg.h"
19

  
20
#define PLAYER_IP "127.0.0.1"
21

  
22
struct nodeID *streamer;
23
static char url[256];
24
static int base_port = 0;
25

  
26
void output_init1(int bufsize, const char *config)
27
{
28
  if(!config) {
29
     fprintf(stderr, "Error: no http output module configuration issued. Exiting\n");
30
     exit(1);
31
  }
32
  if(sscanf(config, "%d", &base_port) < 1) {
33
     fprintf(stderr, "Error: can't parse http output module configuration string %s. Exiting\n", config);
34
     exit(1);
35
  }
36
	//we use the -F portnum parameter as the http port number
37
	sprintf(url, "http://%s:%d%s", PLAYER_IP, base_port, UL_DEFAULT_EXTERNALPLAYER_PATH);
38
	initChunkPusher();
39
}
40

  
41
void output_deliver1(const struct chunk *c)
42
{
43
  int ret = -1;
44

  
45
	//deliver the chunk to the external http player listening on http port
46
	//which has been setup via the -F option of the streamer commandline
47
	//If port was set > 60000 the the http
48
	//deliver is disabled, to allow mixed testing scenarios
49
	if(base_port < 60000) {
50
  	ret = sendViaCurl(*c, GRAPES_ENCODED_CHUNK_HEADER_SIZE + c->size + c->attributes_size, url);
51
  	dprintf("Chunk %d delivered to %s\n", c->id, url);
52
	}
53
}

Also available in: Unified diff