Statistics
| Branch: | Revision:

streamers / output-chunkstream.c @ d0225f2c

History | View | Annotate | Download (1.93 KB)

1
/*
2
 *  Copyright (c) 2010 Csaba Kiraly
3
 *
4
 *  This is free software; see gpl-3.0.txt
5
 */
6
#include <unistd.h>
7
#include <stdlib.h>
8
#include <string.h>
9
#include <stdint.h>
10
#include <stdio.h>
11
#include <fcntl.h>
12
#include <errno.h>
13
#include <limits.h>
14
#include <sys/types.h>
15
#include <sys/socket.h>
16
#include <netinet/in.h>
17
#include <arpa/inet.h>
18

    
19
#include <chunk.h>
20
#include <trade_msg_la.h>
21

    
22
#include "output.h"
23
#include "measures.h"
24
#include "dbg.h"
25

    
26
#define BUFSIZE 65536*8
27
static int fd = -1;
28

    
29
void output_init(int bufsize, const char *fname)
30
{
31
  if (!fname){
32
    fd = STDOUT_FILENO;
33
  } else {
34
    char *c;
35
    int port;
36
    char ip[32];
37

    
38
    c = strchr(fname,',');
39
    if (c) {
40
      *(c++) = 0;
41
    }
42

    
43
    if (sscanf(fname,"tcp://%[0-9.]:%d", ip, &port) == 2) {
44

    
45
      fd = socket(AF_INET, SOCK_STREAM, 0);
46
      if (fd < 0) {
47
        fprintf(stderr,"Error creating socket\n");
48
      } else {
49
        struct sockaddr_in servaddr;
50

    
51
        servaddr.sin_family = AF_INET;
52
        servaddr.sin_port = htons(port);
53
        if (inet_aton(ip, &servaddr.sin_addr) < 0) {
54
          fprintf(stderr,"Error converting IP address: %s\n", ip);
55
          return;
56
        }
57
        if (connect(fd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
58
          fprintf(stderr,"Error connecting to %s:%d\n", ip, port);
59
        }
60
      }
61
    } else {
62
      fd = open(fname, O_CREAT | O_WRONLY | O_TRUNC | O_NONBLOCK, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
63
    }
64
  }
65
}
66

    
67
void output_deliver(const struct chunk *c)
68
{
69
  static char sendbuf[BUFSIZE];
70
  static int pos = 0;
71
  int ret;
72
  uint32_t size;
73

    
74
  size = encodeChunk(c, sendbuf + pos + sizeof(size), BUFSIZE);
75
  if (size <= 0) {
76
    fprintf(stderr,"Error encoding chunk\n");
77
  } else {
78
    *((uint32_t*)(sendbuf + pos)) = htonl(size);
79
    pos += sizeof(size) + size;
80
  }
81

    
82
  ret = write(fd, sendbuf, pos);
83
  if (ret <= 0) {
84
    perror("Error writing to output");
85
  } else {
86
    pos -= ret;
87
    memmove(sendbuf, sendbuf + ret, pos);
88
  }
89
}