Revision 5805c339 input-http.c

View differences:

input-http.c
10 10
#include <sys/socket.h>
11 11
#include <sys/select.h>
12 12
#include <arpa/inet.h>
13
#include <microhttpd.h>
14
#include <pthread.h>
13

  
14
#ifdef HTTPIO_MHD
15
	#include <microhttpd.h>
16
	#include <pthread.h>
17
#elif defined HTTPIO_EVENT
18
	#include "chunk_external_interface.h"
19
#endif
15 20

  
16 21
#include <chunk.h>
17 22
#include <http_default_urls.h>
......
22 27
extern struct chunk_buffer *cb;
23 28
extern int multiply;
24 29

  
25
#ifdef THREADS
26
extern pthread_mutex_t cb_mutex;
27
extern pthread_mutex_t topology_mutex;
28
#else
29
pthread_mutex_t cb_mutex = PTHREAD_MUTEX_INITIALIZER;
30
#ifdef HTTPIO_MHD
31
	#ifdef THREADS
32
	extern pthread_mutex_t cb_mutex;
33
	extern pthread_mutex_t topology_mutex;
34
	#else
35
	pthread_mutex_t cb_mutex = PTHREAD_MUTEX_INITIALIZER;
36
	#endif
37
	struct MHD_Daemon *httpd;
30 38
#endif
31 39

  
32 40
struct input_desc {
33 41
  int dummy;
34 42
};
35 43

  
36
struct MHD_Daemon *httpd;
37

  
38
struct input_desc *input_open(const char *fname, uint16_t flags, int *fds, int fds_size)
39
{
40
  struct input_desc *res;
41

  
42
  if (fds_size >= 1) {
43
    *fds = -1; //This input module needs no fds to monitor
44
  }
45

  
46
  res = malloc(sizeof(struct input_desc));
47
  if (res == NULL) {
48
    return NULL;
49
  }
50

  
51
  res->dummy = 0;
52

  
53
#ifndef THREADS
54
	//in case we are using the non-threaded version of offerstreamer
55
	//we need our own mutex
56
  pthread_mutex_init(&cb_mutex, NULL);
57
#endif
58
  //this daemon will listen the network for incoming chunks from a streaming source
59
  //on the following path and port
60
  httpd = initChunkPuller(UL_DEFAULT_CHUNKBUFFER_PATH, UL_DEFAULT_CHUNKBUFFER_PORT);
61
	dprintf("input httpd thread initialized! %d\n", res->dummy);
62

  
63
  return res;
64
}
65

  
66
void input_close(struct input_desc *s)
67
{
68
  free(s);
69
  finalizeChunkPuller(httpd);
70
#ifndef THREADS
71
  pthread_mutex_destroy(&cb_mutex);
72
#endif
73
}
74

  
75
//this one is not used, just routinely called by the firging thread
76
int input_get(struct input_desc *s, struct chunk *c)
77
{
78
  c->data = NULL;
79
  return 0;
80
}
81 44

  
82 45
//this is the real one, called by the httpd receiver thread
83 46
int enqueueBlock(const uint8_t *block, const int block_size) {
......
89 52
  Chunk* gchunk=NULL;
90 53
	gchunk = (Chunk *)malloc(sizeof(Chunk));
91 54
	if(!gchunk) {
92
		printf("Memory error in gchunk!\n");
55
		fprintf(stderr, "Memory error in gchunk!\n");
93 56
		return -1;
94 57
	}
95 58

  
......
103 66
	if(cb) {
104 67
		int cnt = 0;
105 68
		int i = 0;
106
#ifdef THREADS
69
#ifdef HTTPIO_MHD
70
		#ifdef THREADS
107 71
		//in case of threaded offerstreamer it also has a topology mutex
108 72
		pthread_mutex_lock(&topology_mutex);
109
#endif
73
		#endif
110 74
		pthread_mutex_lock(&cb_mutex);
75
#endif
111 76
		res = add_chunk(gchunk);
112 77
		// if(res)
113 78
		for(i=0; i < multiply; i++) {	// @TODO: why this cycle?
......
116 81
		if (cnt++ % 10 == 0) {
117 82
			update_peers(NULL, NULL, 0);
118 83
		}
84
#ifdef HTTPIO_MHD
119 85
		pthread_mutex_unlock(&cb_mutex);
120
#ifdef THREADS
86
		#ifdef THREADS
121 87
  	pthread_mutex_unlock(&topology_mutex);
88
		#endif
122 89
#endif
123 90
	}
124 91

  
125 92
  return 0;
126 93
}
127 94

  
95
struct input_desc *input_open(const char *fname, uint16_t flags, int *fds, int fds_size)
96
{
97
  struct input_desc *res;
98

  
99
  if (fds_size >= 1) {
100
    *fds = -1; //This input module needs no fds to monitor
101
  }
102

  
103
  res = malloc(sizeof(struct input_desc));
104
  if (res == NULL) {
105
    return NULL;
106
  }
107

  
108
  res->dummy = 0;
109

  
110
#ifdef HTTPIO_MHD
111
	#ifndef THREADS
112
	//in case we are using the non-threaded version of offerstreamer
113
	//we need our own mutex
114
  pthread_mutex_init(&cb_mutex, NULL);
115
	#endif
116
  //this daemon will listen the network for incoming chunks from a streaming source
117
  //on the following path and port
118
  httpd = initChunkPuller(UL_DEFAULT_CHUNKBUFFER_PATH, UL_DEFAULT_CHUNKBUFFER_PORT);
119
	printf("MHD input httpd thread initialized! %d\n", res->dummy);
120
#elif defined HTTPIO_EVENT
121
	if(ulEventHttpServerSetup("127.0.0.1", UL_DEFAULT_CHUNKBUFFER_PORT, &enqueueBlock)) {
122
		return NULL;
123
	}
124
	else {
125
		printf("EVENT input httpd loop initialized! %d\n", res->dummy);
126
	}
127
#endif
128

  
129
  return res;
130
}
131

  
132
void input_close(struct input_desc *s)
133
{
134
  free(s);
135
#ifdef HTTPIO_MHD
136
  finalizeChunkPuller(httpd);
137
	#ifndef THREADS
138
  pthread_mutex_destroy(&cb_mutex);
139
	#endif
140
#elif defined HTTPIO_EVENT
141
	//TODO finalize the event http server
142
#endif
143
}
144

  
145
//this one is not used, just routinely called by the firging thread
146
int input_get(struct input_desc *s, struct chunk *c)
147
{
148
  c->data = NULL;
149
  return 0;
150
}

Also available in: Unified diff