Statistics
| Branch: | Revision:

streamers / output.c @ 9132cb6f

History | View | Annotate | Download (3 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <unistd.h>
8
#include <stdlib.h>
9
#include <string.h>
10
#include <stdint.h>
11
#include <stdio.h>
12

    
13
#include <chunk.h>
14

    
15
#include "out-stream.h"
16
#include "dbg.h"
17

    
18
static int next_chunk;
19
static int buff_size;
20

    
21
struct outbuf {
22
  void *data;
23
  int size;
24
  int id;
25
};
26
static struct outbuf *buff;
27

    
28
void output_init(int bufsize)
29
{
30
  if (!buff) {
31
    int i;
32

    
33
    buff_size = bufsize;
34
    buff = malloc(sizeof(struct outbuf) * buff_size);
35
    if (!buff) {
36
     fprintf(stderr, "Error: can't allocate output buffer\n");
37
     exit(1);
38
    }
39
    for (i = 0; i < buff_size; i++) {
40
      buff[i].data = NULL;
41
    }
42
  } else {
43
   fprintf(stderr, "Error: output buffer re-init not allowed!\n");
44
   exit(1);
45
  }
46
}
47

    
48
void buffer_print()
49
{
50
#ifdef DEBUG
51
  int i;
52

    
53
  dprintf("\toutbuf: %d-> ",next_chunk);
54
  for (i = next_chunk; i < next_chunk + buff_size; i++) {
55
    if (buff[i % buff_size].data) {
56
      dprintf("%d",i % 10);
57
    } else {
58
      dprintf(".");
59
    }
60
  }
61
  dprintf("\n");
62
#endif
63
}
64

    
65
void buffer_free(int i)
66
{
67
  dprintf("\t\tFlush Buf %d: %s\n", i, buff[i].data);
68
  chunk_write(buff[i].id, buff[i].data, buff[i].size);
69
  free(buff[i].data);
70
  buff[i].data = NULL;
71
  dprintf("Next Chunk: %d -> %d\n", next_chunk, buff[i].id + 1);
72
  next_chunk = buff[i].id + 1;
73
}
74

    
75
void buffer_flush(int id)
76
{
77
  int i = id % buff_size;
78

    
79
  while(buff[i].data) {
80
    buffer_free(i);
81
    i = (i + 1) % buff_size;
82
    if (i == id % buff_size) {
83
      break;
84
    }
85
  }
86
}
87

    
88
void output_deliver(const struct chunk *c)
89
{
90
  if (!buff) {
91
    fprintf(stderr, "Warning: code should use output_init!!! Setting output buffer to 8\n");
92
    output_init(8);
93
  }
94

    
95
  dprintf("Chunk %d delivered\n", c->id);
96
  buffer_print();
97
  if (c->id < next_chunk) {
98
    return;
99
  }
100

    
101
  if (c->id >= next_chunk + buff_size) {
102
    int i;
103

    
104
    /* We might need some space for storing this chunk,
105
     * or the stored chunks are too old
106
     */
107
    for (i = next_chunk; i <= c->id - buff_size; i++) {
108
      if (buff[i % buff_size].data) {
109
        buffer_free(i % buff_size);
110
      } else {
111
        next_chunk++;
112
      }
113
    }
114
    buffer_flush(next_chunk);
115
    dprintf("Next is now %d, chunk is %d\n", next_chunk, c->id);
116
  }
117

    
118
  dprintf("%d == %d?\n", c->id, next_chunk);
119
  if (c->id == next_chunk) {
120
    dprintf("\tOut Chunk[%d] - %d: %s\n", c->id, c->id % buff_size, c->data);
121
    chunk_write(c->id, c->data, c->size);
122
    next_chunk++;
123
    buffer_flush(next_chunk);
124
  } else {
125
    dprintf("Storing %d (in %d)\n", c->id, c->id % buff_size);
126
    if (buff[c->id % buff_size].data) {
127
      if (buff[c->id % buff_size].id == c->id) {
128
        /* Duplicate of a stored chunk */
129
        return;
130
      }
131
      fprintf(stderr, "Crap!\n");
132
      exit(-1);
133
    }
134
    /* We previously flushed, so we know that c->id is free */
135
    buff[c->id % buff_size].data = malloc(c->size);
136
    memcpy(buff[c->id % buff_size].data, c->data, c->size);
137
    buff[c->id % buff_size].size = c->size;
138
    buff[c->id % buff_size].id = c->id;
139
  }
140
}