Statistics
| Branch: | Revision:

grapes / src / Chunkiser / output-stream-udp.c @ e7d7c4b6

History | View | Annotate | Download (2.76 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#include <sys/socket.h>
7
#include <netinet/in.h>
8
#include <arpa/inet.h>
9
#include <unistd.h>
10
#include <stdint.h>
11
#include <limits.h>
12
#include <stdio.h>
13
#include <stdlib.h>
14
#include <strings.h>
15

    
16
#include "payload.h"
17
#include "config.h"
18
#include "dechunkiser_iface.h"
19

    
20
#define UDP_PORTS_NUM_MAX 10
21

    
22
struct dechunkiser_ctx {
23
  int outfd;
24
  char ip[16];
25
  int port[UDP_PORTS_NUM_MAX];
26
  int ports;
27
};
28

    
29
static int dst_parse(const char *config, int *ports, char *ip)
30
{
31
  int i = 0;
32
  struct tag *cfg_tags;
33

    
34
  sprintf(ip, "127.0.0.1");
35
  cfg_tags = config_parse(config);
36
  if (cfg_tags) {
37
    int j;
38
    const char *addr;
39

    
40
    addr = config_value_str(cfg_tags, "addr");
41
    if (addr) {
42
      sprintf(ip, "%s", addr);
43
    }
44
    for (j = 0; j < UDP_PORTS_NUM_MAX; j++) {
45
      char tag[8];
46

    
47
      sprintf(tag, "port%d", j);
48
      if (config_value_int(cfg_tags, tag, &ports[i])) {
49
        i++;
50
      }
51
    }
52
  }
53
  free(cfg_tags);
54

    
55
  return i;
56
}
57

    
58
static struct dechunkiser_ctx *udp_open_out(const char *fname, const char *config)
59
{
60
  struct dechunkiser_ctx *res;
61

    
62
  if (!config) {
63
    fprintf(stderr, "udp output not configured, please specify the output ports\n");
64
    return NULL;
65
  }
66

    
67
  res = malloc(sizeof(struct dechunkiser_ctx));
68
  if (res == NULL) {
69
    return NULL;
70
  }
71
  res->outfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
72
  if (res->outfd < 0) {
73
    fprintf(stderr, "cannot open the output socket.\n");
74
    free(res);
75

    
76
    return NULL;
77
  }
78

    
79
  res->ports = dst_parse(config, res->port, res->ip);
80
  if (res->ports ==  0) {
81
    fprintf(stderr, "cannot parse the output ports.\n");
82
    close(res->outfd);
83
    free(res);
84

    
85
    return NULL;
86
  }
87

    
88
  return res;
89
}
90

    
91
static void packet_write(int fd, const char *ip, int port, uint8_t *data, int size)
92
{
93
  struct sockaddr_in si_other;
94

    
95
  bzero(&si_other, sizeof(si_other));
96
  si_other.sin_family = AF_INET;
97
  si_other.sin_port = htons(port);
98
  if (inet_aton(ip, &si_other.sin_addr) == 0) {
99
     fprintf(stderr, " output socket: inet_aton() failed\n");
100

    
101
     return;
102
  }
103

    
104
  sendto(fd, data, size, 0, (struct sockaddr *)&si_other, sizeof(si_other));
105
}
106

    
107
static void udp_write(struct dechunkiser_ctx *o, int id, uint8_t *data, int size)
108
{
109
  int i = 0;
110

    
111
  while (i < size) {
112
    int stream, psize;
113

    
114
    udp_chunk_header_parse(data + i, &psize, &stream);
115
    if (stream > o->ports) {
116
      fprintf(stderr, "Bad stream %d > %d\n", stream, o->ports);
117

    
118
      return;
119
    }
120

    
121
    packet_write(o->outfd, o->ip, o->port[stream], data + i + UDP_CHUNK_HEADER_SIZE, psize);
122
    i += UDP_CHUNK_HEADER_SIZE + psize;
123
  }
124
}
125

    
126
static void udp_close(struct dechunkiser_ctx *s)
127
{
128
  close(s->outfd);
129
  free(s);
130
}
131

    
132
struct dechunkiser_iface out_udp = {
133
  .open = udp_open_out,
134
  .write = udp_write,
135
  .close = udp_close,
136
};