Statistics
| Branch: | Revision:

grapes / src / Chunkiser / output-stream-rtp.c @ 54b02213

History | View | Annotate | Download (4.49 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *  Copyright (c) 2014 Davide Kirchner
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#ifndef _WIN32
8
#include <sys/socket.h>
9
#include <netinet/in.h>
10
#include <arpa/inet.h>
11
#else
12
#include <winsock2.h>
13
#endif
14
#include <unistd.h>
15
#include <stdint.h>
16
#include <limits.h>
17
#include <stdio.h>
18
#include <stdlib.h>
19
#include <string.h>
20
#include <stdarg.h>
21

    
22
#include "int_coding.h"
23
#include "payload.h"
24
#include "grapes_config.h"
25
#include "dechunkiser_iface.h"
26
#include "stream-rtp.h"
27

    
28
#define IP_ADDR_LEN 16
29

    
30
#ifdef _WIN32
31
static int inet_aton(const char *cp, struct in_addr *addr) {
32
    if( cp==NULL || addr==NULL )
33
    {
34
        return(0);
35
    }
36

    
37
    addr->s_addr = inet_addr(cp);
38
    return (addr->s_addr == INADDR_NONE) ? 0 : 1;
39
}
40
#endif
41

    
42
struct dechunkiser_ctx {
43
  int outfd;
44
  char ip[IP_ADDR_LEN];
45
  int ports[RTP_UDP_PORTS_NUM_MAX];
46
  int ports_len;
47
  int verbosity;
48
};
49

    
50
/* Define a printf-like function for logging */
51
static void printf_log(const struct dechunkiser_ctx *ctx, int loglevel,
52
                       const char *fmt, ...) {
53
  va_list args;
54
  FILE* s = stderr;
55
  if (loglevel <= ctx->verbosity && strlen(fmt)) {
56
    fprintf(s, "RTP De-chunkiser: ");
57
    if (loglevel == 0) {
58
      fprintf(s, "Error: ");
59
    }
60
    va_start(args, fmt);
61
    vfprintf(s, fmt, args);
62
    va_end(args);
63
    fprintf(s, "\n");
64
    if (loglevel == 0) {
65
      fflush(s);
66
    }
67
#ifdef DEBUG
68
    fflush(s);
69
#endif
70
  }
71
}
72

    
73

    
74
/*
75
  Parses config and populates ctx accordingly.
76
  Returns 0 on success, nonzero on failure.
77
 */
78
static int conf_parse(struct dechunkiser_ctx *ctx, const char *config) {
79
  int j;
80
  struct tag *cfg_tags;
81
  const char *error_str;
82

    
83
  // defaults
84
  ctx->verbosity = 1;
85
  sprintf(ctx->ip, "127.0.0.1");
86
  ctx->ports_len = 0;
87
  for (j=0; j<RTP_UDP_PORTS_NUM_MAX; j++) {
88
    ctx->ports[j] = -1;
89
  }
90

    
91
  if (!config) {
92
    printf_log(ctx, 0, "No output ports specified");
93
    return 1;
94
  }
95

    
96
  cfg_tags = grapes_config_parse(config);
97
  if (cfg_tags) {
98
    const char *addr;
99

    
100
    grapes_config_value_int(cfg_tags, "verbosity", &(ctx->verbosity));
101
    printf_log(ctx, 2, "Verbosity set to %i", ctx->verbosity);
102

    
103
    addr = grapes_config_value_str(cfg_tags, "addr");
104
    if (addr && strlen(addr) < IP_ADDR_LEN) {
105
      sprintf(ctx->ip, "%s", addr);
106
    }
107
    printf_log(ctx, 1, "Destination IP address: %s", ctx->ip);
108

    
109
    ctx->ports_len =
110
      rtp_ports_parse(cfg_tags, ctx->ports, NULL, &error_str);
111
  }
112
  free(cfg_tags);
113

    
114
  if (ctx->ports_len == 0) {
115
    printf_log(ctx, 0, error_str);
116
    return 2;
117
  }
118

    
119
  for (j=0; j<ctx->ports_len; j++) {
120
    printf_log(ctx, 1, "  Configured output port: %i", ctx->ports[j]);
121
  }
122

    
123
  return 0;
124
}
125

    
126

    
127
static struct dechunkiser_ctx *rtp_open_out(const char *fname, const char *config) {
128
  struct dechunkiser_ctx *res;
129

    
130
  res = malloc(sizeof(struct dechunkiser_ctx));
131
  if (res == NULL) {
132
    return NULL;
133
  }
134

    
135
  if (conf_parse(res, config) != 0) {
136
    free(res);
137
    return NULL;
138
  }
139

    
140
  res->outfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
141
  if (res->outfd < 0) {
142
    printf_log(res, 0, "Could not open output socket");
143
    free(res);
144
    return NULL;
145
  }
146

    
147
  return res;
148
}
149

    
150

    
151
static void packet_write(int fd, const char *ip, int port, uint8_t *data, int size) {
152
  struct sockaddr_in si_other;
153

    
154
  memset(&si_other, 0, sizeof(si_other));
155
  si_other.sin_family = AF_INET;
156
  si_other.sin_port = htons(port);
157
  if (inet_aton(ip, &si_other.sin_addr) == 0) {
158
    fprintf(stderr, "RTP De-chunkiser: output socket: inet_aton() failed\n");
159
    return;
160
  }
161

    
162
  sendto(fd, data, size, 0, (struct sockaddr *)&si_other, sizeof(si_other));
163
}
164

    
165

    
166
static void rtp_write(struct dechunkiser_ctx *ctx, int id, uint8_t *data, int size) {
167
  uint8_t* data_end = data + size;
168
  printf_log(ctx, 2, "Got chunk of size %i", size);
169
  while (data < data_end) {
170
    // NOTE: `stream` here is the index in the ports array.
171
    int stream, psize;
172

    
173
    rtp_payload_per_pkt_header_parse(data, &psize, &stream);
174
    data += RTP_PAYLOAD_PER_PKT_HEADER_SIZE;
175
    if (stream > ctx->ports_len) {
176
      printf_log(ctx, 1, "Received Chunk with bad stream %d > %d",
177
                 stream, ctx->ports_len);
178
      return;
179
    }
180

    
181
    printf_log(ctx, 2,
182
               "sending packet of size %i from port id #%i to port %i",
183
               psize, stream, ctx->ports[stream]);
184
    packet_write(ctx->outfd, ctx->ip, ctx->ports[stream], data, psize);
185
    data += psize;
186
  }
187
}
188

    
189
static void rtp_close(struct dechunkiser_ctx *ctx) {
190
  close(ctx->outfd);
191
  free(ctx);
192
}
193

    
194
struct dechunkiser_iface out_rtp = {
195
  .open = rtp_open_out,
196
  .write = rtp_write,
197
  .close = rtp_close,
198
};