Statistics
| Branch: | Revision:

streamers / output-chunkstream.c @ 5e490c7d

History | View | Annotate | Download (3.25 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#include <unistd.h>
7
#include <stdlib.h>
8
#include <string.h>
9
#include <stdint.h>
10
#include <stdio.h>
11
#include <fcntl.h>
12
#include <errno.h>
13
#include <limits.h>
14
#include <sys/types.h>
15
#ifndef _WIN32
16
#include <sys/socket.h>
17
#include <netinet/in.h>
18
#include <arpa/inet.h>
19
#else
20
#include <winsock2.h>
21
#endif
22

    
23
#include <chunk.h>
24
#include <trade_msg_la.h>
25

    
26
#include "output.h"
27
#include "measures.h"
28
#include "dbg.h"
29

    
30
#define BUFSIZE 65536*8
31
static int fd = -1;
32
static enum MODE {FILE_MODE, TCP_MODE} mode;
33

    
34
#ifdef _WIN32
35
static int inet_aton(const char *cp, struct in_addr *addr)
36
{
37
    if( cp==NULL || addr==NULL )
38
    {
39
        return(0);
40
    }
41

    
42
    addr->s_addr = inet_addr(cp);
43
    return (addr->s_addr == INADDR_NONE) ? 0 : 1;
44
}
45
#endif
46

    
47
void output_init(int bufsize, const char *fname)
48
{
49
  if (!fname){
50
    mode = FILE_MODE;
51
    fd = STDOUT_FILENO;
52
  } else {
53
    char *c;
54
    int port;
55
    char ip[32];
56

    
57
    c = strchr(fname,',');
58
    if (c) {
59
      *(c++) = 0;
60
    }
61

    
62
    if (sscanf(fname,"tcp://%[0-9.]:%d", ip, &port) == 2) {
63

    
64
      mode = TCP_MODE;
65
      fd = socket(AF_INET, SOCK_STREAM, 0);
66
      if (fd < 0) {
67
        fprintf(stderr,"Error creating socket\n");
68
      } else {
69
        struct sockaddr_in servaddr;
70

    
71
        fprintf(stderr,"tcp socket opened fd=%d\n", fd);
72
        servaddr.sin_family = AF_INET;
73
        servaddr.sin_port = htons(port);
74
        if (inet_aton(ip, &servaddr.sin_addr) < 0) {
75
          fprintf(stderr,"Error converting IP address: %s\n", ip);
76
          return;
77
        }
78
        if (connect(fd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
79
          fprintf(stderr,"Error connecting to %s:%d\n", ip, port);
80
        } else {
81
          fprintf(stderr,"Connected to %s:%d\n", ip, port);
82
        }
83
      }
84
    } else {
85
      mode = FILE_MODE;
86
#ifndef _WIN32
87
      fd = open(fname, O_CREAT | O_WRONLY | O_TRUNC | O_NONBLOCK, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
88
#else
89
      fd = open(fname, O_CREAT | O_WRONLY | O_TRUNC);
90
      if (fd >= 0) {
91
         unsigned long nonblocking = 1;
92
         ioctlsocket(fd, FIONBIO, (unsigned long*) &nonblocking);
93
      }
94
#endif
95
      if (fd < 0) {
96
        fprintf(stderr,"Error opening output file %s", fname);
97
        perror(NULL);
98
      } else {
99
        fprintf(stderr,"opened output file %s\n", fname);
100
      }
101
    }
102
  }
103
}
104

    
105
void output_deliver(const struct chunk *c)
106
{
107
  static char sendbuf[BUFSIZE];
108
  static int pos = 0;
109
  int ret;
110
  uint32_t size;
111

    
112
  size = encodeChunk(c, sendbuf + pos + sizeof(size), BUFSIZE - pos);
113
  if (size <= 0) {
114
    fprintf(stderr,"Error encoding chunk\n");
115
  } else {
116
    *((uint32_t*)(sendbuf + pos)) = htonl(size);
117
    pos += sizeof(size) + size;
118
  }
119

    
120
  if (mode == TCP_MODE) {  //distiction needed by Win32
121
    ret = send(fd, sendbuf, pos, 0);
122
  } else {
123
    ret = write(fd, sendbuf, pos);
124
  }
125
  if (ret < 0) {
126
#ifndef _WIN32
127
    if (ret == -1 &&  (errno == EAGAIN || errno == EWOULDBLOCK)) {
128
#else
129
    if (ret == -1 &&  WSAGetLastError() == WSAEWOULDBLOCK) {
130
#endif
131
      fprintf(stderr,"output-chunkstream: Output stalled ...\n");
132
    } else {
133
      perror("output-chunkstream: Error writing to output");
134
      close(fd);
135
      pos = 0;
136
      fd = -1;
137
  } else {
138
    pos -= ret;
139
    memmove(sendbuf, sendbuf + ret, pos);
140
  }
141
}