Statistics
| Branch: | Revision:

streamers / input-http.c @ 03de31e0

History | View | Annotate | Download (3.55 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#include <stdio.h>
7
#include <unistd.h>
8
#include <stdarg.h>
9
#include <sys/types.h>
10
#include <sys/socket.h>
11
#include <sys/select.h>
12
#include <arpa/inet.h>
13

    
14
#ifdef HTTPIO_MHD
15
        #include <microhttpd.h>
16
        #include <pthread.h>
17
#elif defined HTTPIO_EVENT
18
        #include "chunk_external_interface.h"
19
#endif
20

    
21
#include <chunk.h>
22
#include <http_default_urls.h>
23
#include <external_chunk_transcoding.h>
24

    
25
#include "input.h"
26

    
27
extern struct chunk_buffer *cb;
28
extern int multiply;
29

    
30
#ifdef HTTPIO_MHD
31
        #ifdef THREADS
32
        extern pthread_mutex_t cb_mutex;
33
        extern pthread_mutex_t topology_mutex;
34
        #else
35
        pthread_mutex_t cb_mutex = PTHREAD_MUTEX_INITIALIZER;
36
        #endif
37
        struct MHD_Daemon *httpd;
38
#endif
39

    
40
struct input_desc {
41
  int dummy;
42
};
43

    
44

    
45
//this is the real one, called by the httpd receiver thread
46
int enqueueBlock(const uint8_t *block, const int block_size) {
47
        static int ExternalChunk_header_size = 5*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 1*CHUNK_TRANSCODING_INT_SIZE*2;
48
  int decoded_size = 0;
49
        int res = -1;
50
//  struct chunk gchunk;
51

    
52
  Chunk* gchunk=NULL;
53
        gchunk = (Chunk *)malloc(sizeof(Chunk));
54
        if(!gchunk) {
55
                fprintf(stderr, "Memory error in gchunk!\n");
56
                return -1;
57
        }
58

    
59
  decoded_size = decodeChunk(gchunk, block, block_size);
60

    
61
  if(decoded_size < 0 || decoded_size != GRAPES_ENCODED_CHUNK_HEADER_SIZE + ExternalChunk_header_size + gchunk->size) {
62
            fprintf(stderr, "chunk %d probably corrupted!\n", gchunk->id);
63
                return -1;
64
        }
65

    
66
        if (gchunk->attributes) {
67
                free(gchunk->attributes);
68
                gchunk->attributes = NULL;
69
                gchunk->attributes_size = 0;
70
        }
71
        chunk_attributes_fill(gchunk);
72

    
73
        if(cb) {
74
                int cnt = 0;
75
                int i = 0;
76
#ifdef HTTPIO_MHD
77
                #ifdef THREADS
78
                //in case of threaded streamer it also has a topology mutex
79
                pthread_mutex_lock(&topology_mutex);
80
                #endif
81
                pthread_mutex_lock(&cb_mutex);
82
#endif
83
                res = add_chunk(gchunk);
84
                // if(res)
85
                for(i=0; i < multiply; i++) {        // @TODO: why this cycle?
86
                        send_chunk();
87
                }
88
                send_offer();
89
                if (cnt++ % 10 == 0) {
90
                        update_peers(NULL, NULL, 0);
91
                }
92
#ifdef HTTPIO_MHD
93
                pthread_mutex_unlock(&cb_mutex);
94
                #ifdef THREADS
95
          pthread_mutex_unlock(&topology_mutex);
96
                #endif
97
#endif
98
        }
99

    
100
  return 0;
101
}
102

    
103
struct input_desc *input_open(const char *fname, int *fds, int fds_size)
104
{
105
  struct input_desc *res;
106

    
107
  if (fds_size >= 1) {
108
    *fds = -1; //This input module needs no fds to monitor
109
  }
110

    
111
  res = malloc(sizeof(struct input_desc));
112
  if (res == NULL) {
113
    return NULL;
114
  }
115

    
116
  res->dummy = 0;
117

    
118
#ifdef HTTPIO_MHD
119
        #ifndef THREADS
120
        //in case we are using the non-threaded version of streamer
121
        //we need our own mutex
122
  pthread_mutex_init(&cb_mutex, NULL);
123
        #endif
124
  //this daemon will listen the network for incoming chunks from a streaming source
125
  //on the following path and port
126
  httpd = initChunkPuller(UL_DEFAULT_CHUNKBUFFER_PATH, UL_DEFAULT_CHUNKBUFFER_PORT);
127
        printf("MHD input httpd thread initialized! %d\n", res->dummy);
128
#elif defined HTTPIO_EVENT
129
        if(ulEventHttpServerSetup("127.0.0.1", UL_DEFAULT_CHUNKBUFFER_PORT, &enqueueBlock)) {
130
                return NULL;
131
        }
132
        else {
133
                printf("EVENT input httpd loop initialized! %d\n", res->dummy);
134
        }
135
#endif
136

    
137
  return res;
138
}
139

    
140
void input_close(struct input_desc *s)
141
{
142
  free(s);
143
#ifdef HTTPIO_MHD
144
  finalizeChunkPuller(httpd);
145
        #ifndef THREADS
146
  pthread_mutex_destroy(&cb_mutex);
147
        #endif
148
#elif defined HTTPIO_EVENT
149
        //TODO finalize the event http server
150
#endif
151
}
152

    
153
//this one is not used, just routinely called by the firging thread
154
int input_get(struct input_desc *s, struct chunk *c)
155
{
156
  c->data = NULL;
157
  return 0;
158
}