Statistics
| Branch: | Revision:

streamers / output.c @ fdcdce95

History | View | Annotate | Download (2.97 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <unistd.h>
8
#include <stdlib.h>
9
#include <string.h>
10
#include <stdint.h>
11
#include <stdio.h>
12

    
13
#include <chunk.h>
14

    
15
#include "out-stream.h"
16
#include "dbg.h"
17

    
18
static int next_chunk;
19
static int buff_size;
20

    
21
struct outbuf {
22
  void *data;
23
  int size;
24
  int id;
25
};
26
static struct outbuf *buff;
27

    
28
void output_init(int bufsize)
29
{
30
  if (!buff) {
31
    int i;
32

    
33
    buff_size = bufsize;
34
    buff = malloc(sizeof(struct outbuf) * buff_size);
35
    if (!buff) {
36
     fprintf(stderr, "Error: can't allocate output buffer\n");
37
     exit(1);
38
    }
39
    for (i = 0; i < buff_size; i++) {
40
      buff[i].data = NULL;
41
    }
42
  } else {
43
   fprintf(stderr, "Error: output buffer re-init not allowed!\n");
44
   exit(1);
45
  }
46
}
47

    
48
void buffer_print()
49
{
50
  int i;
51

    
52
  dprintf("\toutbuf: %d-> ",next_chunk);
53
  for (i = next_chunk; i < next_chunk + buff_size; i++) {
54
    if (buff[i % buff_size].data) {
55
      dprintf("%d",i % 10);
56
    } else {
57
      dprintf(".");
58
    }
59
  }
60
  dprintf("\n");
61
}
62

    
63
void buffer_free(int i)
64
{
65
  dprintf("\t\tFlush Buf %d: %s\n", i, buff[i].data);
66
  chunk_write(buff[i].id, buff[i].data, buff[i].size);
67
  free(buff[i].data);
68
  buff[i].data = NULL;
69
  dprintf("Next Chunk: %d -> %d\n", next_chunk, buff[i].id + 1);
70
  next_chunk = buff[i].id + 1;
71
}
72

    
73
void buffer_flush(int id)
74
{
75
  int i = id % buff_size;
76

    
77
  while(buff[i].data) {
78
    buffer_free(i);
79
    i = (i + 1) % buff_size;
80
    if (i == id % buff_size) {
81
      break;
82
    }
83
  }
84
}
85

    
86
void output_deliver(const struct chunk *c)
87
{
88
  if (!buff) {
89
    fprintf(stderr, "Warning: code should use output_init!!! Setting output buffer to 8\n");
90
    output_init(8);
91
  }
92

    
93
  dprintf("Chunk %d delivered\n", c->id);
94
  if (c->id < next_chunk) {
95
    return;
96
  }
97

    
98
  if (c->id >= next_chunk + buff_size) {
99
    int i;
100

    
101
    /* We might need some space for storing this chunk,
102
     * or the stored chunks are too old
103
     */
104
    for (i = next_chunk; i <= c->id - buff_size; i++) {
105
      if (buff[i % buff_size].data) {
106
        buffer_free(i % buff_size);
107
      } else {
108
        next_chunk++;
109
      }
110
    }
111
    buffer_flush(next_chunk);
112
    dprintf("Next is now %d, chunk is %d\n", next_chunk, c->id);
113
  }
114

    
115
  dprintf("%d == %d?\n", c->id, next_chunk);
116
  if (c->id == next_chunk) {
117
    dprintf("\tOut Chunk[%d] - %d: %s\n", c->id, c->id % buff_size, c->data);
118
    chunk_write(c->id, c->data, c->size);
119
    next_chunk++;
120
    buffer_flush(next_chunk);
121
  } else {
122
    dprintf("Storing %d (in %d)\n", c->id, c->id % buff_size);
123
    if (buff[c->id % buff_size].data) {
124
      if (buff[c->id % buff_size].id == c->id) {
125
        /* Duplicate of a stored chunk */
126
        return;
127
      }
128
      fprintf(stderr, "Crap!\n");
129
      exit(-1);
130
    }
131
    /* We previously flushed, so we know that c->id is free */
132
    buff[c->id % buff_size].data = malloc(c->size);
133
    memcpy(buff[c->id % buff_size].data, c->data, c->size);
134
    buff[c->id % buff_size].size = c->size;
135
    buff[c->id % buff_size].id = c->id;
136
  }
137
}