Statistics
| Branch: | Revision:

grapes / src / Chunkiser / output-stream-udp.c @ 176b8de8

History | View | Annotate | Download (3.09 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#ifndef _WIN32
7
#include <sys/socket.h>
8
#include <netinet/in.h>
9
#include <arpa/inet.h>
10
#else
11
#include <winsock2.h>
12
#endif
13
#include <unistd.h>
14
#include <stdint.h>
15
#include <limits.h>
16
#include <stdio.h>
17
#include <stdlib.h>
18
#include <string.h>
19

    
20
#include "int_coding.h"
21
#include "payload.h"
22
#include "grapes_config.h"
23
#include "dechunkiser_iface.h"
24

    
25
#define UDP_PORTS_NUM_MAX 10
26

    
27
#ifdef _WIN32
28
static int inet_aton(const char *cp, struct in_addr *addr)
29
{
30
    if( cp==NULL || addr==NULL )
31
    {
32
        return(0);
33
    }
34

    
35
    addr->s_addr = inet_addr(cp);
36
    return (addr->s_addr == INADDR_NONE) ? 0 : 1;
37
}
38
#endif
39

    
40
struct dechunkiser_ctx {
41
  int outfd;
42
  char ip[16];
43
  int port[UDP_PORTS_NUM_MAX];
44
  int ports;
45
};
46

    
47
static int dst_parse(const char *config, int *ports, char *ip)
48
{
49
  int i = 0;
50
  struct tag *cfg_tags;
51

    
52
  sprintf(ip, "127.0.0.1");
53
  cfg_tags = grapes_config_parse(config);
54
  if (cfg_tags) {
55
    int j;
56
    const char *addr;
57

    
58
    addr = grapes_config_value_str(cfg_tags, "addr");
59
    if (addr) {
60
      sprintf(ip, "%s", addr);
61
    }
62
    for (j = 0; j < UDP_PORTS_NUM_MAX; j++) {
63
      char tag[8];
64

    
65
      sprintf(tag, "port%d", j);
66
      if (grapes_config_value_int(cfg_tags, tag, &ports[i])) {
67
        i++;
68
      }
69
    }
70
  }
71
  free(cfg_tags);
72

    
73
  return i;
74
}
75

    
76
static struct dechunkiser_ctx *udp_open_out(const char *fname, const char *config)
77
{
78
  struct dechunkiser_ctx *res;
79

    
80
  if (!config) {
81
    fprintf(stderr, "udp output not configured, please specify the output ports\n");
82
    return NULL;
83
  }
84

    
85
  res = malloc(sizeof(struct dechunkiser_ctx));
86
  if (res == NULL) {
87
    return NULL;
88
  }
89
  res->outfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
90
  if (res->outfd < 0) {
91
    fprintf(stderr, "cannot open the output socket.\n");
92
    free(res);
93

    
94
    return NULL;
95
  }
96

    
97
  res->ports = dst_parse(config, res->port, res->ip);
98
  if (res->ports ==  0) {
99
    fprintf(stderr, "cannot parse the output ports.\n");
100
    close(res->outfd);
101
    free(res);
102

    
103
    return NULL;
104
  }
105

    
106
  return res;
107
}
108

    
109
static void packet_write(int fd, const char *ip, int port, uint8_t *data, int size)
110
{
111
  struct sockaddr_in si_other;
112

    
113
  memset(&si_other, 0, sizeof(si_other));
114
  si_other.sin_family = AF_INET;
115
  si_other.sin_port = htons(port);
116
  if (inet_aton(ip, &si_other.sin_addr) == 0) {
117
     fprintf(stderr, " output socket: inet_aton() failed\n");
118

    
119
     return;
120
  }
121

    
122
  sendto(fd, data, size, 0, (struct sockaddr *)&si_other, sizeof(si_other));
123
}
124

    
125
static void udp_write(struct dechunkiser_ctx *o, int id, uint8_t *data, int size)
126
{
127
  int i = 0;
128

    
129
  while (i < size) {
130
    int stream, psize;
131

    
132
    udp_payload_header_parse(data + i, &psize, &stream);
133
    if (stream > o->ports) {
134
      fprintf(stderr, "Bad stream %d > %d\n", stream, o->ports);
135

    
136
      return;
137
    }
138

    
139
    packet_write(o->outfd, o->ip, o->port[stream], data + i + UDP_PAYLOAD_HEADER_SIZE, psize);
140
    i += UDP_PAYLOAD_HEADER_SIZE + psize;
141
  }
142
}
143

    
144
static void udp_close(struct dechunkiser_ctx *s)
145
{
146
  close(s->outfd);
147
  free(s);
148
}
149

    
150
struct dechunkiser_iface out_udp = {
151
  .open = udp_open_out,
152
  .write = udp_write,
153
  .close = udp_close,
154
};