Statistics
| Branch: | Revision:

streamers / output-chunkstream.c @ 3a265915

History | View | Annotate | Download (3.85 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#include <unistd.h>
7
#include <stdlib.h>
8
#include <string.h>
9
#include <stdint.h>
10
#include <stdio.h>
11
#include <fcntl.h>
12
#include <errno.h>
13
#include <limits.h>
14
#include <sys/types.h>
15
#ifndef _WIN32
16
#include <sys/stat.h>
17
#include <sys/socket.h>
18
#include <netinet/in.h>
19
#include <arpa/inet.h>
20
#else
21
#include <winsock2.h>
22
#endif
23

    
24
#include <chunk.h>
25
#include <trade_msg_la.h>
26

    
27
#include "output.h"
28
#include "measures.h"
29
#include "dbg.h"
30

    
31
#define BUFSIZE 65536*8
32
static int fd = -1;
33
static char *fname = NULL;
34
static enum MODE {FILE_MODE, TCP_MODE} mode;
35

    
36
#ifdef _WIN32
37
static int inet_aton(const char *cp, struct in_addr *addr)
38
{
39
    if( cp==NULL || addr==NULL )
40
    {
41
        return(0);
42
    }
43

    
44
    addr->s_addr = inet_addr(cp);
45
    return (addr->s_addr == INADDR_NONE) ? 0 : 1;
46
}
47
#endif
48

    
49
static void output_connect(void)
50
{
51
  if (!fname){
52
    mode = FILE_MODE;
53
    fd = STDOUT_FILENO;
54
  } else {
55
    char *c;
56
    int port;
57
    char ip[32];
58

    
59
    c = strchr(fname,',');
60
    if (c) {
61
      *(c++) = 0;
62
    }
63

    
64
    if (sscanf(fname,"tcp://%[0-9.]:%d", ip, &port) == 2) {
65

    
66
      mode = TCP_MODE;
67
      fd = socket(AF_INET, SOCK_STREAM, 0);
68
      if (fd < 0) {
69
        fprintf(stderr,"output-chunkstream: Error creating socket\n");
70
      } else {
71
        struct sockaddr_in servaddr;
72

    
73
        fprintf(stderr,"output-chunkstream: tcp socket opened fd=%d\n", fd);
74
        servaddr.sin_family = AF_INET;
75
        servaddr.sin_port = htons(port);
76
        if (inet_aton(ip, &servaddr.sin_addr) < 0) {
77
          fprintf(stderr,"output-chunkstream: Error converting IP address: %s\n", ip);
78
          return;
79
        }
80
        if (connect(fd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
81
          fprintf(stderr,"output-chunkstream: Error connecting to %s:%d\n", ip, port);
82
        } else {
83
          fprintf(stderr,"output-chunkstream: Connected to %s:%d\n", ip, port);
84
        }
85
      }
86
    } else {
87
      mode = FILE_MODE;
88
#ifndef _WIN32
89
      fd = open(fname, O_CREAT | O_WRONLY | O_TRUNC | O_NONBLOCK, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
90
#else
91
      fd = open(fname, O_CREAT | O_WRONLY | O_TRUNC);
92
      if (fd >= 0) {
93
         unsigned long nonblocking = 1;
94
         ioctlsocket(fd, FIONBIO, (unsigned long*) &nonblocking);
95
      }
96
#endif
97
      if (fd < 0) {
98
        fprintf(stderr,"output-chunkstream: Error opening output file %s", fname);
99
        perror(NULL);
100
      } else {
101
        fprintf(stderr,"output-chunkstream: opened output file %s\n", fname);
102
      }
103
    }
104
  }
105
}
106

    
107
void output_init(int bufsize, const char *fn)
108
{
109
  if (fn) fname = strdup(fn);
110
  output_connect();
111
}
112

    
113
void output_deliver(const struct chunk *c)
114
{
115
  static char sendbuf[BUFSIZE];
116
  static int pos = 0;
117
  int ret;
118
  uint32_t size;
119

    
120
  size = encodeChunk(c, sendbuf + pos + sizeof(size), BUFSIZE - pos);
121
  if (size <= 0) {
122
    fprintf(stderr,"output-chunkstream: Error encoding chunk or no space in output buffer, skipping\n");
123
  } else {
124
    *((uint32_t*)(sendbuf + pos)) = htonl(size);
125
    pos += sizeof(size) + size;
126
  }
127

    
128
  if (mode == TCP_MODE && fd < 0) {
129
    fprintf(stderr,"output-chunkstream: reconnecting ...\n");
130
    output_connect();
131
  }
132
  if (fd < 0) {
133
    fprintf(stderr,"output-chunkstream: not conected\n");
134
    return;
135
  }
136

    
137
  if (mode == TCP_MODE) {  //distiction needed by Win32
138
    ret = send(fd, sendbuf, pos, 0);
139
  } else {
140
    ret = write(fd, sendbuf, pos);
141
  }
142
  if (ret < 0) {
143
#ifndef _WIN32
144
    if (ret == -1 &&  (errno == EAGAIN || errno == EWOULDBLOCK)) {
145
#else
146
    if (ret == -1 &&  WSAGetLastError() == WSAEWOULDBLOCK) {
147
#endif
148
      fprintf(stderr,"output-chunkstream: Output stalled ...\n");
149
    } else {
150
      perror("output-chunkstream: Error writing to output");
151
      close(fd);
152
      pos = 0;
153
      fd = -1;
154
    }
155
  } else {
156
    dprintf("output-chunkstream: written %d bytes\n", ret);
157
    pos -= ret;
158
    memmove(sendbuf, sendbuf + ret, pos);
159
  }
160
}