Statistics
| Branch: | Revision:

streamers / input-chunkstream.c @ ddcccc93

History | View | Annotate | Download (3.41 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#include <sys/time.h>
7
#include <stdlib.h>
8
#include <stdint.h>
9
#include <stdio.h>
10
#include <string.h>
11
#include <unistd.h>
12
#include <fcntl.h>
13
#include <errno.h>
14
#include <limits.h>
15
#include <sys/types.h>
16
#ifndef _WIN32
17
#include <sys/socket.h>
18
#include <netinet/in.h>
19
#else
20
#include <winsock2.h>
21
#endif
22

    
23
#include <chunk.h>
24
#include <trade_msg_la.h>
25

    
26
#include "input.h"
27
#include "dbg.h"
28

    
29
#define BUFSIZE 65536*8
30

    
31
struct input_desc {
32
  int fd;
33
};
34

    
35
struct input_desc *input_open(const char *fname, int *fds, int fds_size)
36
{
37
  struct input_desc *res;
38

    
39
  res = malloc(sizeof(struct input_desc));
40
  if (res == NULL) {
41
    return NULL;
42
  }
43
  res->fd = -1;
44

    
45
  if (!fname){
46
    res->fd = STDIN_FILENO;
47
  } else {
48
    char *c;
49
    int port;
50
    char ip[32];
51

    
52
    c = strchr(fname,',');
53
    if (c) {
54
      *(c++) = 0;
55
    }
56

    
57
    if (sscanf(fname,"tcp://%[0-9.]:%d", ip, &port) == 2) {
58

    
59
      int accept_fd;
60

    
61
      accept_fd = socket(AF_INET, SOCK_STREAM, 0);
62
      if (accept_fd < 0) {
63
        fprintf(stderr,"Error creating socket\n");
64
      } else {
65
        struct sockaddr_in servaddr;
66

    
67
        servaddr.sin_family = AF_INET;
68
        servaddr.sin_addr.s_addr = htonl(INADDR_ANY);        //TODO
69
        servaddr.sin_port = htons(port);
70
        if (bind(accept_fd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
71
          fprintf(stderr,"Error binding to socket\n");
72
        } else if (listen(accept_fd, 1) < 0) {
73
          fprintf(stderr,"Waiting for chunkstream connection\n");
74
        } else {
75
          res->fd = accept(accept_fd, NULL, NULL);
76
          if (res->fd < 0) {
77
            fprintf(stderr,"Error accepting connection\n");
78
          }
79
        }
80
        close(accept_fd);
81
      }
82
    } else {
83
      res->fd = open(fname, O_RDONLY); //TODO
84
    }
85
  }
86

    
87
  if (res->fd < 0) {
88
    free(res);
89
    return NULL;
90
  } else {
91
#ifndef _WIN32
92
    if (fcntl(res->fd, F_SETFL, O_NONBLOCK) == -1) {
93
      perror("fcntl(O_NONBLOCK)");
94
    }
95
#else
96
    unsigned long nonblocking = 1;
97
    ioctlsocket(res->fd, FIONBIO, (unsigned long*) &nonblocking);
98
#endif
99
  }
100

    
101
  fds[0] = res->fd;
102
  fds[1] = -1;
103

    
104
  return res;
105
}
106

    
107
void input_close(struct input_desc *s)
108
{
109
  close(s->fd);
110
  free(s);
111
}
112

    
113
int input_get(struct input_desc *s, struct chunk *c)
114
{
115
  int ret;
116
  uint32_t size;
117
  static char recvbuf[BUFSIZE];
118
  static int pos = 0;
119

    
120
  c->data = NULL;
121
  ret = recv(s->fd, recvbuf + pos, BUFSIZE - pos, 0);
122
  if (ret < 0) {
123
#ifndef _WIN32
124
    if (errno != EAGAIN && errno != EWOULDBLOCK) {
125
#else
126
    if (WSAGetLastError() != WSAEWOULDBLOCK) {
127
#endif
128
      perror("chunkstream connection error");
129
      exit(EXIT_FAILURE);
130
    }
131
  } else {
132
    pos += ret;
133
  }
134

    
135
  if ( pos < sizeof(size)) {
136
    return INT_MAX;
137
  }
138

    
139
  size = ntohl(*(uint32_t*)recvbuf);
140
  if (pos >= sizeof(size) + size) {
141
    ret = decodeChunk(c, recvbuf + sizeof(size), size);
142
    if (ret < 0) {
143
      printf("Error decoding chunk!\n");
144
      return INT_MAX;
145
    }
146

    
147
    // remove attributes //TODO: verify whether this is the right place to do this
148
    if (c->attributes) {
149
      free(c->attributes);
150
      c->attributes = NULL;
151
      c->attributes_size = 0;
152
    }
153

    
154
    pos -= sizeof(size) + size;
155
    memmove(recvbuf, recvbuf + sizeof(size) + size, pos);
156
  }
157

    
158
  //check if there are other chunks in the buffer
159
  if ( pos >= sizeof(size)) {
160
    size = ntohl(*(uint32_t*)recvbuf);
161
    if (pos >= sizeof(size) + size) {
162
      return 0;
163
    }
164
  }
165

    
166
  return INT_MAX;
167
}