Statistics
| Branch: | Revision:

streamers / input-http.c @ 81b601c1

History | View | Annotate | Download (2.85 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#include <stdio.h>
7
#include <unistd.h>
8
#include <stdarg.h>
9
#include <sys/types.h>
10
#include <sys/socket.h>
11
#include <sys/select.h>
12
#include <arpa/inet.h>
13
#include <microhttpd.h>
14

    
15
#include <chunk.h>
16
#include <http_default_urls.h>
17
#include <external_chunk_transcoding.h>
18

    
19
#include "input.h"
20

    
21
extern struct chunk_buffer *cb;
22

    
23
#ifdef THREADS
24
extern pthread_mutex_t cb_mutex;
25
extern pthread_mutex_t topology_mutex;
26
#else
27
pthread_mutex_t cb_mutex;
28
#endif
29

    
30
struct input_desc {
31
  int dummy;
32
};
33

    
34
struct MHD_Daemon *httpd;
35

    
36
struct input_desc *input_open(const char *fname, uint16_t flags, int *fds, int fds_size)
37
{
38
  struct input_desc *res;
39

    
40
  if (fds_size >= 1) {
41
    *fds = -1; //This input module needs no fds to monitor
42
  }
43

    
44
  res = malloc(sizeof(struct input_desc));
45
  if (res == NULL) {
46
    return NULL;
47
  }
48

    
49
  res->dummy = 0;
50
dprintf("BEFORE INIT! %d\n", res->dummy);
51
#ifndef THREADS
52
  pthread_mutex_init(&cb_mutex, NULL);
53
#endif
54
  //this daemon will listen the network for incoming chunks from a streaming source
55
  //on the following path and port
56
  httpd = initChunkPuller(UL_DEFAULT_CHUNKBUFFER_PATH, UL_DEFAULT_CHUNKBUFFER_PORT);
57
dprintf("AFTER INIT! %d\n", res->dummy);
58

    
59
  return res;
60
}
61

    
62
void input_close(struct input_desc *s)
63
{
64
  free(s);
65
  finalizeChunkPuller(httpd);
66
}
67

    
68
//this one is not used, just routinely called by the firging thread
69
int input_get(struct input_desc *s, struct chunk *c)
70
{
71
  c->data = NULL;
72
  return 0;
73
}
74

    
75
//this is the real one, called by the http receiver thread
76
int enqueueBlock(const uint8_t *block, const int block_size) {
77
        static int ExternalChunk_header_size = 5*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 2*CHUNK_TRANSCODING_INT_SIZE + 1*CHUNK_TRANSCODING_INT_SIZE*2;
78
  int decoded_size = 0;
79
        int res = -1;
80
//  struct chunk gchunk;
81

    
82
  Chunk* gchunk=NULL;
83
        gchunk = (Chunk *)malloc(sizeof(Chunk));
84
        if(!gchunk) {
85
                printf("Memory error in gchunk!\n");
86
                return -1;
87
        }
88

    
89
  decoded_size = decodeChunk(gchunk, block, block_size);
90

    
91
  if(decoded_size < 0 || decoded_size != GRAPES_ENCODED_CHUNK_HEADER_SIZE + ExternalChunk_header_size + gchunk->size) {
92
            fprintf(stderr, "chunk %d probably corrupted!\n", gchunk->id);
93
                return -1;
94
        }
95

    
96
  if(cb) {
97
#ifdef THREADS
98
          pthread_mutex_lock(&topology_mutex);
99
#endif
100
          pthread_mutex_lock(&cb_mutex);
101
          res = add_chunk(gchunk);
102
//          free(gchunk);
103
//          pthread_mutex_unlock(&cb_mutex);
104
//  }
105
//  if (res < 0) { //chunk sequence is older than previous chunk (SHOULD SEND ANYWAY!!!)
106
//    free(gchunk->data);
107
//    free(gchunk->attributes);
108
//    fprintf(stderr, "Chunk %d of %d bytes FAIL res %d\n", gchunk->id, gchunk->size, res);
109
//  }
110
//  else {
111
//    pthread_mutex_lock(&cb_mutex);
112
    if(res) send_chunk(); //push it
113
    pthread_mutex_unlock(&cb_mutex);
114
#ifdef THREADS
115
          pthread_mutex_unlock(&topology_mutex);
116
#endif
117
  }
118

    
119
  return 0;
120
}
121