Statistics
| Branch: | Revision:

grapes / src / Chunkiser / output-stream-udp.c @ 61042ab2

History | View | Annotate | Download (2.78 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#include <sys/socket.h>
7
#include <netinet/in.h>
8
#include <arpa/inet.h>
9
#include <unistd.h>
10
#include <stdint.h>
11
#include <limits.h>
12
#include <stdio.h>
13
#include <stdlib.h>
14
#include <string.h>
15

    
16
#include "int_coding.h"
17
#include "payload.h"
18
#include "config.h"
19
#include "dechunkiser_iface.h"
20

    
21
#define UDP_PORTS_NUM_MAX 10
22

    
23
struct dechunkiser_ctx {
24
  int outfd;
25
  char ip[16];
26
  int port[UDP_PORTS_NUM_MAX];
27
  int ports;
28
};
29

    
30
static int dst_parse(const char *config, int *ports, char *ip)
31
{
32
  int i = 0;
33
  struct tag *cfg_tags;
34

    
35
  sprintf(ip, "127.0.0.1");
36
  cfg_tags = config_parse(config);
37
  if (cfg_tags) {
38
    int j;
39
    const char *addr;
40

    
41
    addr = config_value_str(cfg_tags, "addr");
42
    if (addr) {
43
      sprintf(ip, "%s", addr);
44
    }
45
    for (j = 0; j < UDP_PORTS_NUM_MAX; j++) {
46
      char tag[8];
47

    
48
      sprintf(tag, "port%d", j);
49
      if (config_value_int(cfg_tags, tag, &ports[i])) {
50
        i++;
51
      }
52
    }
53
  }
54
  free(cfg_tags);
55

    
56
  return i;
57
}
58

    
59
static struct dechunkiser_ctx *udp_open_out(const char *fname, const char *config)
60
{
61
  struct dechunkiser_ctx *res;
62

    
63
  if (!config) {
64
    fprintf(stderr, "udp output not configured, please specify the output ports\n");
65
    return NULL;
66
  }
67

    
68
  res = malloc(sizeof(struct dechunkiser_ctx));
69
  if (res == NULL) {
70
    return NULL;
71
  }
72
  res->outfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
73
  if (res->outfd < 0) {
74
    fprintf(stderr, "cannot open the output socket.\n");
75
    free(res);
76

    
77
    return NULL;
78
  }
79

    
80
  res->ports = dst_parse(config, res->port, res->ip);
81
  if (res->ports ==  0) {
82
    fprintf(stderr, "cannot parse the output ports.\n");
83
    close(res->outfd);
84
    free(res);
85

    
86
    return NULL;
87
  }
88

    
89
  return res;
90
}
91

    
92
static void packet_write(int fd, const char *ip, int port, uint8_t *data, int size)
93
{
94
  struct sockaddr_in si_other;
95

    
96
  bzero(&si_other, sizeof(si_other));
97
  si_other.sin_family = AF_INET;
98
  si_other.sin_port = htons(port);
99
  if (inet_aton(ip, &si_other.sin_addr) == 0) {
100
     fprintf(stderr, " output socket: inet_aton() failed\n");
101

    
102
     return;
103
  }
104

    
105
  sendto(fd, data, size, 0, (struct sockaddr *)&si_other, sizeof(si_other));
106
}
107

    
108
static void udp_write(struct dechunkiser_ctx *o, int id, uint8_t *data, int size)
109
{
110
  int i = 0;
111

    
112
  while (i < size) {
113
    int stream, psize;
114

    
115
    udp_chunk_header_parse(data + i, &psize, &stream);
116
    if (stream > o->ports) {
117
      fprintf(stderr, "Bad stream %d > %d\n", stream, o->ports);
118

    
119
      return;
120
    }
121

    
122
    packet_write(o->outfd, o->ip, o->port[stream], data + i + UDP_CHUNK_HEADER_SIZE, psize);
123
    i += UDP_CHUNK_HEADER_SIZE + psize;
124
  }
125
}
126

    
127
static void udp_close(struct dechunkiser_ctx *s)
128
{
129
  close(s->outfd);
130
  free(s);
131
}
132

    
133
struct dechunkiser_iface out_udp = {
134
  .open = udp_open_out,
135
  .write = udp_write,
136
  .close = udp_close,
137
};