Statistics
| Branch: | Revision:

streamers / input-grapes.c @ 03de31e0

History | View | Annotate | Download (2.45 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/time.h>
8
#include <stdlib.h>
9
#include <stdint.h>
10
#include <stdio.h>
11
#include <string.h>
12
#include <limits.h>
13

    
14
#include <chunk.h>
15
#include <chunkiser.h>
16

    
17
#include "input.h"
18
#include "dbg.h"
19

    
20
extern int initial_id;
21

    
22
struct input_desc {
23
  struct input_stream *s;
24
  int id;
25
  int interframe;
26
  uint64_t start_time;
27
  uint64_t first_ts;
28
};
29

    
30
struct input_desc *input_open(const char *fname, int *fds, int fds_size)
31
{
32
  struct input_desc *res;
33
  struct timeval tv;
34
  char *c;
35

    
36
  res = malloc(sizeof(struct input_desc));
37
  if (res == NULL) {
38
    return NULL;
39
  }
40

    
41
  c = strchr(fname,',');
42
  if (c) {
43
    *(c++) = 0;
44
  }
45
  res->s = input_stream_open(fname, &res->interframe, c);
46
  if (res->s == NULL) {
47
    free(res);
48
    res = NULL;
49
    return res;
50
  }
51
  if (res->interframe == 0) {
52
    const int *my_fds;
53
    int i = 0;
54

    
55
    my_fds = input_get_fds(res->s);
56
    while(my_fds[i] != -1) {
57
      fds[i] = my_fds[i];
58
      i = i + 1;
59
    }
60
    fds[i] = -1;
61
  } else {
62
    if (fds_size >= 1) {
63
      fds[0] = -1; //This input module needs no fds to monitor
64
    }
65
    gettimeofday(&tv, NULL);
66
    res->start_time = tv.tv_usec + tv.tv_sec * 1000000ULL;
67
    res->first_ts = 0;
68
    res->id = 0; //(res->start_time / res->interframe) % INT_MAX; //TODO: verify 32/64 bit
69

    
70
    if(initial_id == -1) {
71
      res->id = (res->start_time / res->interframe) % INT_MAX; //TODO: verify 32/64 bit
72
    } else {
73
      res->id = initial_id;
74
    }
75

    
76
    fprintf(stderr,"Initial Chunk Id %d\n", res->id);
77
  }
78

    
79
  return res;
80
}
81

    
82
void input_close(struct input_desc *s)
83
{
84
  input_stream_close(s->s);
85
  free(s);
86
}
87

    
88
int input_get(struct input_desc *s, struct chunk *c)
89
{
90
  struct timeval now;
91
  int64_t delta;
92
  int res;
93

    
94
  c->attributes_size = 0;
95
  c->attributes = NULL;
96

    
97
  c->id = s->id;
98
  res = chunkise(s->s, c);
99
  if (res < 0) {
100
    return -1;
101
  }
102
  if (res > 0) {
103
    s->id++;
104
  }
105
  if (s->first_ts == 0) {
106
    s->first_ts = c->timestamp;
107
  }
108
  gettimeofday(&now, NULL);
109
  if (s->interframe) {
110
    delta = c->timestamp - s->first_ts + s->interframe;
111
    delta = delta + s->start_time - now.tv_sec * 1000000ULL - now.tv_usec;
112
    dprintf("Delta: %lld\n", delta);
113
    dprintf("Generate Chunk[%d] (TS: %llu)\n", c->id, c->timestamp);
114
    if (delta < 0) {
115
      delta = 0;
116
    }
117
  } else {
118
    delta = 0;                /* Will not be used */
119
  }
120
  c->timestamp = now.tv_sec * 1000000ULL + now.tv_usec;
121

    
122
  return delta;
123
}