streamers / output-chunkstream.c @ 3a265915
History | View | Annotate | Download (3.85 KB)
1 |
/*
|
---|---|
2 |
* Copyright (c) 2010 Csaba Kiraly
|
3 |
*
|
4 |
* This is free software; see gpl-3.0.txt
|
5 |
*/
|
6 |
#include <unistd.h> |
7 |
#include <stdlib.h> |
8 |
#include <string.h> |
9 |
#include <stdint.h> |
10 |
#include <stdio.h> |
11 |
#include <fcntl.h> |
12 |
#include <errno.h> |
13 |
#include <limits.h> |
14 |
#include <sys/types.h> |
15 |
#ifndef _WIN32
|
16 |
#include <sys/stat.h> |
17 |
#include <sys/socket.h> |
18 |
#include <netinet/in.h> |
19 |
#include <arpa/inet.h> |
20 |
#else
|
21 |
#include <winsock2.h> |
22 |
#endif
|
23 |
|
24 |
#include <chunk.h> |
25 |
#include <trade_msg_la.h> |
26 |
|
27 |
#include "output.h" |
28 |
#include "measures.h" |
29 |
#include "dbg.h" |
30 |
|
31 |
#define BUFSIZE 65536*8 |
32 |
static int fd = -1; |
33 |
static char *fname = NULL; |
34 |
static enum MODE {FILE_MODE, TCP_MODE} mode; |
35 |
|
36 |
#ifdef _WIN32
|
37 |
static int inet_aton(const char *cp, struct in_addr *addr) |
38 |
{ |
39 |
if( cp==NULL || addr==NULL ) |
40 |
{ |
41 |
return(0); |
42 |
} |
43 |
|
44 |
addr->s_addr = inet_addr(cp); |
45 |
return (addr->s_addr == INADDR_NONE) ? 0 : 1; |
46 |
} |
47 |
#endif
|
48 |
|
49 |
static void output_connect(void) |
50 |
{ |
51 |
if (!fname){
|
52 |
mode = FILE_MODE; |
53 |
fd = STDOUT_FILENO; |
54 |
} else {
|
55 |
char *c;
|
56 |
int port;
|
57 |
char ip[32]; |
58 |
|
59 |
c = strchr(fname,',');
|
60 |
if (c) {
|
61 |
*(c++) = 0;
|
62 |
} |
63 |
|
64 |
if (sscanf(fname,"tcp://%[0-9.]:%d", ip, &port) == 2) { |
65 |
|
66 |
mode = TCP_MODE; |
67 |
fd = socket(AF_INET, SOCK_STREAM, 0);
|
68 |
if (fd < 0) { |
69 |
fprintf(stderr,"output-chunkstream: Error creating socket\n");
|
70 |
} else {
|
71 |
struct sockaddr_in servaddr;
|
72 |
|
73 |
fprintf(stderr,"output-chunkstream: tcp socket opened fd=%d\n", fd);
|
74 |
servaddr.sin_family = AF_INET; |
75 |
servaddr.sin_port = htons(port); |
76 |
if (inet_aton(ip, &servaddr.sin_addr) < 0) { |
77 |
fprintf(stderr,"output-chunkstream: Error converting IP address: %s\n", ip);
|
78 |
return;
|
79 |
} |
80 |
if (connect(fd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) { |
81 |
fprintf(stderr,"output-chunkstream: Error connecting to %s:%d\n", ip, port);
|
82 |
} else {
|
83 |
fprintf(stderr,"output-chunkstream: Connected to %s:%d\n", ip, port);
|
84 |
} |
85 |
} |
86 |
} else {
|
87 |
mode = FILE_MODE; |
88 |
#ifndef _WIN32
|
89 |
fd = open(fname, O_CREAT | O_WRONLY | O_TRUNC | O_NONBLOCK, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); |
90 |
#else
|
91 |
fd = open(fname, O_CREAT | O_WRONLY | O_TRUNC); |
92 |
if (fd >= 0) { |
93 |
unsigned long nonblocking = 1; |
94 |
ioctlsocket(fd, FIONBIO, (unsigned long*) &nonblocking); |
95 |
} |
96 |
#endif
|
97 |
if (fd < 0) { |
98 |
fprintf(stderr,"output-chunkstream: Error opening output file %s", fname);
|
99 |
perror(NULL);
|
100 |
} else {
|
101 |
fprintf(stderr,"output-chunkstream: opened output file %s\n", fname);
|
102 |
} |
103 |
} |
104 |
} |
105 |
} |
106 |
|
107 |
void output_init(int bufsize, const char *fn) |
108 |
{ |
109 |
if (fn) fname = strdup(fn);
|
110 |
output_connect(); |
111 |
} |
112 |
|
113 |
void output_deliver(const struct chunk *c) |
114 |
{ |
115 |
static char sendbuf[BUFSIZE]; |
116 |
static int pos = 0; |
117 |
int ret;
|
118 |
uint32_t size; |
119 |
|
120 |
size = encodeChunk(c, sendbuf + pos + sizeof(size), BUFSIZE - pos);
|
121 |
if (size <= 0) { |
122 |
fprintf(stderr,"output-chunkstream: Error encoding chunk or no space in output buffer, skipping\n");
|
123 |
} else {
|
124 |
*((uint32_t*)(sendbuf + pos)) = htonl(size); |
125 |
pos += sizeof(size) + size;
|
126 |
} |
127 |
|
128 |
if (mode == TCP_MODE && fd < 0) { |
129 |
fprintf(stderr,"output-chunkstream: reconnecting ...\n");
|
130 |
output_connect(); |
131 |
} |
132 |
if (fd < 0) { |
133 |
fprintf(stderr,"output-chunkstream: not conected\n");
|
134 |
return;
|
135 |
} |
136 |
|
137 |
if (mode == TCP_MODE) { //distiction needed by Win32 |
138 |
ret = send(fd, sendbuf, pos, 0);
|
139 |
} else {
|
140 |
ret = write(fd, sendbuf, pos); |
141 |
} |
142 |
if (ret < 0) { |
143 |
#ifndef _WIN32
|
144 |
if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { |
145 |
#else
|
146 |
if (ret == -1 && WSAGetLastError() == WSAEWOULDBLOCK) { |
147 |
#endif
|
148 |
fprintf(stderr,"output-chunkstream: Output stalled ...\n");
|
149 |
} else {
|
150 |
perror("output-chunkstream: Error writing to output");
|
151 |
close(fd); |
152 |
pos = 0;
|
153 |
fd = -1;
|
154 |
} |
155 |
} else {
|
156 |
dprintf("output-chunkstream: written %d bytes\n", ret);
|
157 |
pos -= ret; |
158 |
memmove(sendbuf, sendbuf + ret, pos); |
159 |
} |
160 |
} |