Statistics
| Branch: | Revision:

streamers / input-chunkstream.c @ d0225f2c

History | View | Annotate | Download (2.96 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#include <sys/time.h>
7
#include <stdlib.h>
8
#include <stdint.h>
9
#include <stdio.h>
10
#include <string.h>
11
#include <unistd.h>
12
#include <fcntl.h>
13
#include <errno.h>
14
#include <limits.h>
15
#include <sys/types.h>
16
#include <sys/socket.h>
17
#include <netinet/in.h>
18

    
19
#include <chunk.h>
20
#include <trade_msg_la.h>
21

    
22
#include "input.h"
23
#include "dbg.h"
24

    
25
#define BUFSIZE 65536*8
26

    
27
struct input_desc {
28
  int fd;
29
};
30

    
31
struct input_desc *input_open(const char *fname, int *fds, int fds_size)
32
{
33
  struct input_desc *res;
34

    
35
  res = malloc(sizeof(struct input_desc));
36
  if (res == NULL) {
37
    return NULL;
38
  }
39
  res->fd = -1;
40

    
41
  if (!fname){
42
    res->fd = STDIN_FILENO;
43
  } else {
44
    char *c;
45
    int port;
46
    char ip[32];
47

    
48
    c = strchr(fname,',');
49
    if (c) {
50
      *(c++) = 0;
51
    }
52

    
53
    if (sscanf(fname,"tcp://%[0-9.]:%d", ip, &port) == 2) {
54

    
55
      int accept_fd;
56

    
57
      accept_fd = socket(AF_INET, SOCK_STREAM, 0);
58
      if (accept_fd < 0) {
59
        fprintf(stderr,"Error creating socket\n");
60
      } else {
61
        struct sockaddr_in servaddr;
62

    
63
        servaddr.sin_family = AF_INET;
64
        servaddr.sin_addr.s_addr = htonl(INADDR_ANY);        //TODO
65
        servaddr.sin_port = htons(port);
66
        if (bind(accept_fd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
67
          fprintf(stderr,"Error binding to socket\n");
68
        } else if (listen(accept_fd, 1) < 0) {
69
          fprintf(stderr,"Waiting for chunkstream connection\n");
70
        } else {
71
          res->fd = accept(accept_fd, NULL, NULL);
72
          if (res->fd < 0) {
73
            fprintf(stderr,"Error accepting connection\n");
74
          }
75
        }
76
        close(accept_fd);
77
      }
78
    } else {
79
      res->fd = open(fname, O_RDONLY | O_NONBLOCK);
80
    }
81
  }
82

    
83
  if (res->fd < 0) {
84
    free(res);
85
    return NULL;
86
  }
87
  fds[0] = res->fd;
88
  fds[1] = -1;
89

    
90
  return res;
91
}
92

    
93
void input_close(struct input_desc *s)
94
{
95
  close(s->fd);
96
  free(s);
97
}
98

    
99
int input_get(struct input_desc *s, struct chunk *c)
100
{
101
  int ret;
102
  uint32_t size;
103
  static char recvbuf[BUFSIZE];
104
  static int pos = 0;
105

    
106
  c->data = NULL;
107
  ret = recv(s->fd, recvbuf + pos, BUFSIZE - pos, MSG_DONTWAIT);
108
  if (ret <= 0 && pos < sizeof(size)) {
109
    if (ret == -1 &&  (errno == EAGAIN || errno == EWOULDBLOCK)) {
110
      return 999999;
111
    } else {
112
      perror("chunkstream connection error");
113
      exit(EXIT_FAILURE);
114
    }
115
  } else {
116
    pos += ret;
117
  }
118
  if ( pos < sizeof(size)) {
119
    return 999999;
120
  }
121

    
122
  size = ntohl(*(uint32_t*)recvbuf);
123
  if (pos >= sizeof(size) + size) {
124
    ret = decodeChunk(c, recvbuf + sizeof(size), size);
125
    if (ret < 0) {
126
      printf("Error decoding chunk!\n");
127
      return 999999;
128
    }
129

    
130
    // remove attributes //TODO: verify whether this is the right place to do this
131
    if (c->attributes) {
132
      free(c->attributes);
133
      c->attributes = NULL;
134
      c->attributes_size = 0;
135
    }
136

    
137
    pos -= sizeof(size) + size;
138
    memmove(recvbuf, recvbuf + sizeof(size) + size, pos);
139
  }
140
  return 999999;
141
}