Statistics
| Branch: | Revision:

streamers / input.c @ 9f051e26

History | View | Annotate | Download (1.6 KB)

1
#include <sys/time.h>
2
#include <stdlib.h>
3
#include <stdint.h>
4
#include <stdio.h>
5
#include <string.h>
6
#include <limits.h>
7

    
8
#include <chunk.h>
9

    
10
#include "input.h"
11
#include "input-stream.h"
12
#include "dbg.h"
13

    
14
struct input_desc {
15
  struct input_stream *s;
16
  int id;
17
  int interframe;
18
  uint64_t start_time;
19
  uint64_t first_ts;
20
};
21

    
22
struct input_desc *input_open(const char *fname, uint16_t flags)
23
{
24
  struct input_desc *res;
25
  struct timeval tv;
26

    
27
  res = malloc(sizeof(struct input_desc));
28
  if (res == NULL) {
29
    return NULL;
30
  }
31
  gettimeofday(&tv, NULL);
32
  res->start_time = tv.tv_usec + tv.tv_sec * 1000000ULL;
33
  res->first_ts = 0;
34
  res->s = input_stream_open(fname, &res->interframe, flags);
35
  if (res->s == NULL) {
36
    free(res);
37
    res = NULL;
38
  }
39
  res->id = (res->start_time / res->interframe) % INT_MAX; //TODO: verify 32/64 bit
40

    
41
  return res;
42
}
43

    
44
void input_close(struct input_desc *s)
45
{
46
  input_stream_close(s->s);
47
  free(s);
48
}
49

    
50
int input_get(struct input_desc *s, struct chunk *c)
51
{
52
  struct timeval now;
53
  int64_t delta;
54

    
55
  c->data = chunkise(s->s, s->id, &c->size, &c->timestamp);
56
  if (c->size == -1) {
57
    return -1;
58
  }
59
  if (c->data) {
60
    c->id = s->id++;
61
  }
62
  c->attributes_size = 0;
63
  c->attributes = NULL;
64
  if (s->first_ts == 0) {
65
    s->first_ts = c->timestamp;
66
  }
67
  delta = c->timestamp - s->first_ts + s->interframe;
68
  gettimeofday(&now, NULL);
69
  delta = delta + s->start_time - now.tv_sec * 1000000ULL - now.tv_usec;
70
  dprintf("Delta: %lld\n", delta);
71
  dprintf("Generate Chunk[%d] (TS: %llu)\n", c->id, c->timestamp);
72

    
73
  c->timestamp = now.tv_sec * 1000000ULL + now.tv_usec;
74

    
75
  return delta > 0 ? delta : 0;
76
}