Revision d0225f2c

View differences:

Makefile
114 114
endif
115 115
endif
116 116
endif
117
ifeq ($(IO), chunkstream)
118
OBJS += input-chunkstream.o output-chunkstream.o
119
endif
117 120

  
118 121
EXECTARGET = streamer
119 122
ifdef ML
input-chunkstream.c
1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#include <sys/time.h>
7
#include <stdlib.h>
8
#include <stdint.h>
9
#include <stdio.h>
10
#include <string.h>
11
#include <unistd.h>
12
#include <fcntl.h>
13
#include <errno.h>
14
#include <limits.h>
15
#include <sys/types.h>
16
#include <sys/socket.h>
17
#include <netinet/in.h>
18

  
19
#include <chunk.h>
20
#include <trade_msg_la.h>
21

  
22
#include "input.h"
23
#include "dbg.h"
24

  
25
#define BUFSIZE 65536*8
26

  
27
struct input_desc {
28
  int fd;
29
};
30

  
31
struct input_desc *input_open(const char *fname, int *fds, int fds_size)
32
{
33
  struct input_desc *res;
34

  
35
  res = malloc(sizeof(struct input_desc));
36
  if (res == NULL) {
37
    return NULL;
38
  }
39
  res->fd = -1;
40

  
41
  if (!fname){
42
    res->fd = STDIN_FILENO;
43
  } else {
44
    char *c;
45
    int port;
46
    char ip[32];
47

  
48
    c = strchr(fname,',');
49
    if (c) {
50
      *(c++) = 0;
51
    }
52

  
53
    if (sscanf(fname,"tcp://%[0-9.]:%d", ip, &port) == 2) {
54

  
55
      int accept_fd;
56

  
57
      accept_fd = socket(AF_INET, SOCK_STREAM, 0);
58
      if (accept_fd < 0) {
59
        fprintf(stderr,"Error creating socket\n");
60
      } else {
61
        struct sockaddr_in servaddr;
62

  
63
        servaddr.sin_family = AF_INET;
64
        servaddr.sin_addr.s_addr = htonl(INADDR_ANY);	//TODO
65
        servaddr.sin_port = htons(port);
66
        if (bind(accept_fd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
67
          fprintf(stderr,"Error binding to socket\n");
68
        } else if (listen(accept_fd, 1) < 0) {
69
          fprintf(stderr,"Waiting for chunkstream connection\n");
70
        } else {
71
          res->fd = accept(accept_fd, NULL, NULL);
72
          if (res->fd < 0) {
73
            fprintf(stderr,"Error accepting connection\n");
74
          }
75
        }
76
        close(accept_fd);
77
      }
78
    } else {
79
      res->fd = open(fname, O_RDONLY | O_NONBLOCK);
80
    }
81
  }
82

  
83
  if (res->fd < 0) {
84
    free(res);
85
    return NULL;
86
  }
87
  fds[0] = res->fd;
88
  fds[1] = -1;
89

  
90
  return res;
91
}
92

  
93
void input_close(struct input_desc *s)
94
{
95
  close(s->fd);
96
  free(s);
97
}
98

  
99
int input_get(struct input_desc *s, struct chunk *c)
100
{
101
  int ret;
102
  uint32_t size;
103
  static char recvbuf[BUFSIZE];
104
  static int pos = 0;
105

  
106
  c->data = NULL;
107
  ret = recv(s->fd, recvbuf + pos, BUFSIZE - pos, MSG_DONTWAIT);
108
  if (ret <= 0 && pos < sizeof(size)) {
109
    if (ret == -1 &&  (errno == EAGAIN || errno == EWOULDBLOCK)) {
110
      return 999999;
111
    } else {
112
      perror("chunkstream connection error");
113
      exit(EXIT_FAILURE);
114
    }
115
  } else {
116
    pos += ret;
117
  }
118
  if ( pos < sizeof(size)) {
119
    return 999999;
120
  }
121

  
122
  size = ntohl(*(uint32_t*)recvbuf);
123
  if (pos >= sizeof(size) + size) {
124
    ret = decodeChunk(c, recvbuf + sizeof(size), size);
125
    if (ret < 0) {
126
      printf("Error decoding chunk!\n");
127
      return 999999;
128
    }
129

  
130
    // remove attributes //TODO: verify whether this is the right place to do this
131
    if (c->attributes) {
132
      free(c->attributes);
133
      c->attributes = NULL;
134
      c->attributes_size = 0;
135
    }
136

  
137
    pos -= sizeof(size) + size;
138
    memmove(recvbuf, recvbuf + sizeof(size) + size, pos);
139
  }
140
  return 999999;
141
}
output-chunkstream.c
1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#include <unistd.h>
7
#include <stdlib.h>
8
#include <string.h>
9
#include <stdint.h>
10
#include <stdio.h>
11
#include <fcntl.h>
12
#include <errno.h>
13
#include <limits.h>
14
#include <sys/types.h>
15
#include <sys/socket.h>
16
#include <netinet/in.h>
17
#include <arpa/inet.h>
18

  
19
#include <chunk.h>
20
#include <trade_msg_la.h>
21

  
22
#include "output.h"
23
#include "measures.h"
24
#include "dbg.h"
25

  
26
#define BUFSIZE 65536*8
27
static int fd = -1;
28

  
29
void output_init(int bufsize, const char *fname)
30
{
31
  if (!fname){
32
    fd = STDOUT_FILENO;
33
  } else {
34
    char *c;
35
    int port;
36
    char ip[32];
37

  
38
    c = strchr(fname,',');
39
    if (c) {
40
      *(c++) = 0;
41
    }
42

  
43
    if (sscanf(fname,"tcp://%[0-9.]:%d", ip, &port) == 2) {
44

  
45
      fd = socket(AF_INET, SOCK_STREAM, 0);
46
      if (fd < 0) {
47
        fprintf(stderr,"Error creating socket\n");
48
      } else {
49
        struct sockaddr_in servaddr;
50

  
51
        servaddr.sin_family = AF_INET;
52
        servaddr.sin_port = htons(port);
53
        if (inet_aton(ip, &servaddr.sin_addr) < 0) {
54
          fprintf(stderr,"Error converting IP address: %s\n", ip);
55
          return;
56
        }
57
        if (connect(fd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
58
          fprintf(stderr,"Error connecting to %s:%d\n", ip, port);
59
        }
60
      }
61
    } else {
62
      fd = open(fname, O_CREAT | O_WRONLY | O_TRUNC | O_NONBLOCK, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
63
    }
64
  }
65
}
66

  
67
void output_deliver(const struct chunk *c)
68
{
69
  static char sendbuf[BUFSIZE];
70
  static int pos = 0;
71
  int ret;
72
  uint32_t size;
73

  
74
  size = encodeChunk(c, sendbuf + pos + sizeof(size), BUFSIZE);
75
  if (size <= 0) {
76
    fprintf(stderr,"Error encoding chunk\n");
77
  } else {
78
    *((uint32_t*)(sendbuf + pos)) = htonl(size);
79
    pos += sizeof(size) + size;
80
  }
81

  
82
  ret = write(fd, sendbuf, pos);
83
  if (ret <= 0) {
84
    perror("Error writing to output");
85
  } else {
86
    pos -= ret;
87
    memmove(sendbuf, sendbuf + ret, pos);
88
  }
89
}

Also available in: Unified diff