Statistics
| Branch: | Revision:

streamers / input-http.c @ 5805c339

History | View | Annotate | Download (3.41 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#include <stdio.h>
7
#include <unistd.h>
8
#include <stdarg.h>
9
#include <sys/types.h>
10
#include <sys/socket.h>
11
#include <sys/select.h>
12
#include <arpa/inet.h>
13

    
14
#ifdef HTTPIO_MHD
15
        #include <microhttpd.h>
16
        #include <pthread.h>
17
#elif defined HTTPIO_EVENT
18
        #include "chunk_external_interface.h"
19
#endif
20

    
21
#include <chunk.h>
22
#include <http_default_urls.h>
23
#include <external_chunk_transcoding.h>
24

    
25
#include "input.h"
26

    
27
extern struct chunk_buffer *cb;
28
extern int multiply;
29

    
30
#ifdef HTTPIO_MHD
31
        #ifdef THREADS
32
        extern pthread_mutex_t cb_mutex;
33
        extern pthread_mutex_t topology_mutex;
34
        #else
35
        pthread_mutex_t cb_mutex = PTHREAD_MUTEX_INITIALIZER;
36
        #endif
37
        struct MHD_Daemon *httpd;
38
#endif
39

    
40
struct input_desc {
41
  int dummy;
42
};
43

    
44

    
45
//this is the real one, called by the httpd receiver thread
46
int enqueueBlock(const uint8_t *block, const int block_size) {
47
        static int ExternalChunk_header_size = 5*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 1*CHUNK_TRANSCODING_INT_SIZE*2;
48
  int decoded_size = 0;
49
        int res = -1;
50
//  struct chunk gchunk;
51

    
52
  Chunk* gchunk=NULL;
53
        gchunk = (Chunk *)malloc(sizeof(Chunk));
54
        if(!gchunk) {
55
                fprintf(stderr, "Memory error in gchunk!\n");
56
                return -1;
57
        }
58

    
59
  decoded_size = decodeChunk(gchunk, block, block_size);
60

    
61
  if(decoded_size < 0 || decoded_size != GRAPES_ENCODED_CHUNK_HEADER_SIZE + ExternalChunk_header_size + gchunk->size) {
62
            fprintf(stderr, "chunk %d probably corrupted!\n", gchunk->id);
63
                return -1;
64
        }
65

    
66
        if(cb) {
67
                int cnt = 0;
68
                int i = 0;
69
#ifdef HTTPIO_MHD
70
                #ifdef THREADS
71
                //in case of threaded offerstreamer it also has a topology mutex
72
                pthread_mutex_lock(&topology_mutex);
73
                #endif
74
                pthread_mutex_lock(&cb_mutex);
75
#endif
76
                res = add_chunk(gchunk);
77
                // if(res)
78
                for(i=0; i < multiply; i++) {        // @TODO: why this cycle?
79
                        send_chunk();
80
                }
81
                if (cnt++ % 10 == 0) {
82
                        update_peers(NULL, NULL, 0);
83
                }
84
#ifdef HTTPIO_MHD
85
                pthread_mutex_unlock(&cb_mutex);
86
                #ifdef THREADS
87
          pthread_mutex_unlock(&topology_mutex);
88
                #endif
89
#endif
90
        }
91

    
92
  return 0;
93
}
94

    
95
struct input_desc *input_open(const char *fname, uint16_t flags, int *fds, int fds_size)
96
{
97
  struct input_desc *res;
98

    
99
  if (fds_size >= 1) {
100
    *fds = -1; //This input module needs no fds to monitor
101
  }
102

    
103
  res = malloc(sizeof(struct input_desc));
104
  if (res == NULL) {
105
    return NULL;
106
  }
107

    
108
  res->dummy = 0;
109

    
110
#ifdef HTTPIO_MHD
111
        #ifndef THREADS
112
        //in case we are using the non-threaded version of offerstreamer
113
        //we need our own mutex
114
  pthread_mutex_init(&cb_mutex, NULL);
115
        #endif
116
  //this daemon will listen the network for incoming chunks from a streaming source
117
  //on the following path and port
118
  httpd = initChunkPuller(UL_DEFAULT_CHUNKBUFFER_PATH, UL_DEFAULT_CHUNKBUFFER_PORT);
119
        printf("MHD input httpd thread initialized! %d\n", res->dummy);
120
#elif defined HTTPIO_EVENT
121
        if(ulEventHttpServerSetup("127.0.0.1", UL_DEFAULT_CHUNKBUFFER_PORT, &enqueueBlock)) {
122
                return NULL;
123
        }
124
        else {
125
                printf("EVENT input httpd loop initialized! %d\n", res->dummy);
126
        }
127
#endif
128

    
129
  return res;
130
}
131

    
132
void input_close(struct input_desc *s)
133
{
134
  free(s);
135
#ifdef HTTPIO_MHD
136
  finalizeChunkPuller(httpd);
137
        #ifndef THREADS
138
  pthread_mutex_destroy(&cb_mutex);
139
        #endif
140
#elif defined HTTPIO_EVENT
141
        //TODO finalize the event http server
142
#endif
143
}
144

    
145
//this one is not used, just routinely called by the firging thread
146
int input_get(struct input_desc *s, struct chunk *c)
147
{
148
  c->data = NULL;
149
  return 0;
150
}