streamers / input-http.c @ 03de31e0
History | View | Annotate | Download (3.55 KB)
1 | 8a49328f | CsabaKiraly | /*
|
---|---|---|---|
2 | * Copyright (c) 2010 Csaba Kiraly
|
||
3 | *
|
||
4 | * This is free software; see gpl-3.0.txt
|
||
5 | */
|
||
6 | #include <stdio.h> |
||
7 | #include <unistd.h> |
||
8 | #include <stdarg.h> |
||
9 | #include <sys/types.h> |
||
10 | #include <sys/socket.h> |
||
11 | #include <sys/select.h> |
||
12 | #include <arpa/inet.h> |
||
13 | |||
14 | #ifdef HTTPIO_MHD
|
||
15 | #include <microhttpd.h> |
||
16 | #include <pthread.h> |
||
17 | #elif defined HTTPIO_EVENT
|
||
18 | #include "chunk_external_interface.h" |
||
19 | #endif
|
||
20 | |||
21 | #include <chunk.h> |
||
22 | #include <http_default_urls.h> |
||
23 | #include <external_chunk_transcoding.h> |
||
24 | |||
25 | #include "input.h" |
||
26 | |||
27 | extern struct chunk_buffer *cb; |
||
28 | extern int multiply; |
||
29 | |||
30 | #ifdef HTTPIO_MHD
|
||
31 | #ifdef THREADS
|
||
32 | extern pthread_mutex_t cb_mutex;
|
||
33 | extern pthread_mutex_t topology_mutex;
|
||
34 | #else
|
||
35 | pthread_mutex_t cb_mutex = PTHREAD_MUTEX_INITIALIZER; |
||
36 | #endif
|
||
37 | struct MHD_Daemon *httpd;
|
||
38 | #endif
|
||
39 | |||
40 | struct input_desc {
|
||
41 | int dummy;
|
||
42 | }; |
||
43 | |||
44 | |||
45 | //this is the real one, called by the httpd receiver thread
|
||
46 | int enqueueBlock(const uint8_t *block, const int block_size) { |
||
47 | static int ExternalChunk_header_size = 5*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 1*CHUNK_TRANSCODING_INT_SIZE*2; |
||
48 | int decoded_size = 0; |
||
49 | int res = -1; |
||
50 | // struct chunk gchunk;
|
||
51 | |||
52 | Chunk* gchunk=NULL;
|
||
53 | gchunk = (Chunk *)malloc(sizeof(Chunk));
|
||
54 | if(!gchunk) {
|
||
55 | fprintf(stderr, "Memory error in gchunk!\n");
|
||
56 | return -1; |
||
57 | } |
||
58 | |||
59 | decoded_size = decodeChunk(gchunk, block, block_size); |
||
60 | |||
61 | if(decoded_size < 0 || decoded_size != GRAPES_ENCODED_CHUNK_HEADER_SIZE + ExternalChunk_header_size + gchunk->size) { |
||
62 | fprintf(stderr, "chunk %d probably corrupted!\n", gchunk->id);
|
||
63 | return -1; |
||
64 | } |
||
65 | |||
66 | if (gchunk->attributes) {
|
||
67 | free(gchunk->attributes); |
||
68 | gchunk->attributes = NULL;
|
||
69 | gchunk->attributes_size = 0;
|
||
70 | } |
||
71 | chunk_attributes_fill(gchunk); |
||
72 | |||
73 | if(cb) {
|
||
74 | int cnt = 0; |
||
75 | int i = 0; |
||
76 | #ifdef HTTPIO_MHD
|
||
77 | #ifdef THREADS
|
||
78 | cbd3919d | Csaba Kiraly | //in case of threaded streamer it also has a topology mutex
|
79 | 8a49328f | CsabaKiraly | pthread_mutex_lock(&topology_mutex); |
80 | #endif
|
||
81 | pthread_mutex_lock(&cb_mutex); |
||
82 | #endif
|
||
83 | res = add_chunk(gchunk); |
||
84 | // if(res)
|
||
85 | for(i=0; i < multiply; i++) { // @TODO: why this cycle? |
||
86 | send_chunk(); |
||
87 | } |
||
88 | feaec543 | CsabaKiraly | send_offer(); |
89 | 8a49328f | CsabaKiraly | if (cnt++ % 10 == 0) { |
90 | update_peers(NULL, NULL, 0); |
||
91 | } |
||
92 | #ifdef HTTPIO_MHD
|
||
93 | pthread_mutex_unlock(&cb_mutex); |
||
94 | #ifdef THREADS
|
||
95 | pthread_mutex_unlock(&topology_mutex); |
||
96 | #endif
|
||
97 | #endif
|
||
98 | } |
||
99 | |||
100 | return 0; |
||
101 | } |
||
102 | |||
103 | 03de31e0 | Csaba Kiraly | struct input_desc *input_open(const char *fname, int *fds, int fds_size) |
104 | 8a49328f | CsabaKiraly | { |
105 | struct input_desc *res;
|
||
106 | |||
107 | if (fds_size >= 1) { |
||
108 | *fds = -1; //This input module needs no fds to monitor |
||
109 | } |
||
110 | |||
111 | res = malloc(sizeof(struct input_desc)); |
||
112 | if (res == NULL) { |
||
113 | return NULL; |
||
114 | } |
||
115 | |||
116 | res->dummy = 0;
|
||
117 | |||
118 | #ifdef HTTPIO_MHD
|
||
119 | #ifndef THREADS
|
||
120 | cbd3919d | Csaba Kiraly | //in case we are using the non-threaded version of streamer
|
121 | 8a49328f | CsabaKiraly | //we need our own mutex
|
122 | pthread_mutex_init(&cb_mutex, NULL);
|
||
123 | #endif
|
||
124 | //this daemon will listen the network for incoming chunks from a streaming source
|
||
125 | //on the following path and port
|
||
126 | httpd = initChunkPuller(UL_DEFAULT_CHUNKBUFFER_PATH, UL_DEFAULT_CHUNKBUFFER_PORT); |
||
127 | printf("MHD input httpd thread initialized! %d\n", res->dummy);
|
||
128 | #elif defined HTTPIO_EVENT
|
||
129 | if(ulEventHttpServerSetup("127.0.0.1", UL_DEFAULT_CHUNKBUFFER_PORT, &enqueueBlock)) { |
||
130 | return NULL; |
||
131 | } |
||
132 | else {
|
||
133 | printf("EVENT input httpd loop initialized! %d\n", res->dummy);
|
||
134 | } |
||
135 | #endif
|
||
136 | |||
137 | return res;
|
||
138 | } |
||
139 | |||
140 | void input_close(struct input_desc *s) |
||
141 | { |
||
142 | free(s); |
||
143 | #ifdef HTTPIO_MHD
|
||
144 | finalizeChunkPuller(httpd); |
||
145 | #ifndef THREADS
|
||
146 | pthread_mutex_destroy(&cb_mutex); |
||
147 | #endif
|
||
148 | #elif defined HTTPIO_EVENT
|
||
149 | //TODO finalize the event http server
|
||
150 | #endif
|
||
151 | } |
||
152 | |||
153 | //this one is not used, just routinely called by the firging thread
|
||
154 | int input_get(struct input_desc *s, struct chunk *c) |
||
155 | { |
||
156 | c->data = NULL;
|
||
157 | return 0; |
||
158 | } |