Statistics
| Branch: | Revision:

streamers / streaming.c @ 5bb51b2b

History | View | Annotate | Download (3.53 KB)

1 89e893e2 Luca
#include <stdlib.h>
2
#include <stdio.h>
3
#include <stdint.h>
4
5
#include <net_helper.h>
6
#include <chunk.h> 
7
#include <chunkbuffer.h> 
8
#include <trade_msg_la.h>
9
#include <trade_msg_ha.h>
10 0f35d029 Csaba Kiraly
#include <peerset.h>
11
#include <peer.h>
12 e98d8f50 Csaba Kiraly
#include <chunkidset.h>
13 89e893e2 Luca
14
#include "streaming.h"
15
#include "output.h"
16
#include "input.h"
17 e64fc7e5 Luca
#include "dbg.h"
18 89e893e2 Luca
19 4367dafd Csaba Kiraly
#include "scheduler_la.h"
20
21 89e893e2 Luca
static struct chunk_buffer *cb;
22 709f774c Luca
static struct input_desc *input;
23 89e893e2 Luca
24 6920fdab Luca
void stream_init(int size, struct nodeID *myID)
25 89e893e2 Luca
{
26
  char conf[32];
27
28
  sprintf(conf, "size=%d", size);
29
  cb = cb_init(conf);
30 6920fdab Luca
  chunkInit(myID);
31 7442ecb3 Luca
}
32
33
int source_init(const char *fname, struct nodeID *myID)
34
{
35
  input = input_open(fname);
36
  if (input == NULL) {
37
    return -1;
38
  }
39
40
  stream_init(1, myID);
41
  return 0;
42 89e893e2 Luca
}
43
44 e98d8f50 Csaba Kiraly
void received_chunk(struct peerset *pset, struct nodeID *from, const uint8_t *buff, int len)
45 89e893e2 Luca
{
46
  int res;
47 52b7c5ea Luca
  static struct chunk c;
48 30c9739a Csaba Kiraly
  struct peer *p;
49 89e893e2 Luca
50
  res = decodeChunk(&c, buff + 1, len - 1);
51
  if (res > 0) {
52
    output_deliver(&c);
53
    res = cb_add_chunk(cb, &c);
54
    if (res < 0) {
55
      free(c.data);
56
      free(c.attributes);
57
    }
58 30c9739a Csaba Kiraly
    p = peerset_get_peer(pset,from);
59
    if (!p) {
60
      printf("warning: received chunk %d from unknown peer: %s! Adding it to neighbourhood!\n", c.id, node_addr(from));
61
      peerset_add_peer(pset,from);
62
      p = peerset_get_peer(pset,from);
63
    }
64
    if (p) {        //now we have it almost sure
65
      chunkID_set_add_chunk(p->bmap,c.id);        //don't send it back
66
    }
67 89e893e2 Luca
  }
68
}
69
70
void generated_chunk(void)
71
{
72
  int res;
73
  struct chunk c;
74
75 4bb789ed Luca
  if (input_get(input, &c) <= 0) {
76
    fprintf(stderr, "Error in input!\n");
77
    exit(-1);
78
  }
79 5fb5ecd4 Luca
  res = cb_add_chunk(cb, &c);
80
  if (res < 0) {
81
    free(c.data);
82
    free(c.attributes);
83 89e893e2 Luca
  }
84
}
85
86 4367dafd Csaba Kiraly
/**
87
 *example function to filter chunks based on whether a given peer needs them. The real implementation
88
 * should look at buffermap information received about the given peer (or it should guess)
89
 */
90 251441c1 Csaba Kiraly
int needs(struct peer *p, struct chunk *c){
91 5bb51b2b Csaba Kiraly
  return (! p->bmap || chunkID_set_check(p->bmap,c->id) < 0);
92 4367dafd Csaba Kiraly
}
93 251441c1 Csaba Kiraly
double randomPeer(struct peer **p){
94 4367dafd Csaba Kiraly
  return 1;
95
}
96 251441c1 Csaba Kiraly
double getChunkTimestamp(struct chunk **c){
97 4367dafd Csaba Kiraly
  return (double) (*c)->timestamp;
98
}
99
100
101 0f35d029 Csaba Kiraly
void send_chunk(const struct peerset *pset)
102 89e893e2 Luca
{
103
  struct chunk *buff;
104 0f35d029 Csaba Kiraly
  int size, res, i, n;
105
  const struct peer *neighbours;
106 89e893e2 Luca
107 0f35d029 Csaba Kiraly
  n = peerset_size(pset);
108
  neighbours = peerset_get_peers(pset);
109 e64fc7e5 Luca
  dprintf("Send Chunk: %d neighbours\n", n);
110 89e893e2 Luca
  if (n == 0) return;
111
  buff = cb_get_chunks(cb, &size);
112 e64fc7e5 Luca
  dprintf("\t %d chunks in buffer...\n", size);
113 89e893e2 Luca
  if (size == 0) return;
114
115
  /************ STUPID DUMB SCHEDULING ****************/
116 4367dafd Csaba Kiraly
  //target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
117
  //c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
118 89e893e2 Luca
  /************ /STUPID DUMB SCHEDULING ****************/
119
120 4367dafd Csaba Kiraly
  /************ USE SCHEDULER ****************/
121
  size_t selectedpairs_len = 1;
122 0f35d029 Csaba Kiraly
  struct chunk *chunkps[size];
123
  for (i = 0;i < size; i++) chunkps[i] = buff+i;
124
  struct peer *peerps[n];
125
  for (i = 0; i<n; i++) peerps[i] = neighbours+i;
126 4367dafd Csaba Kiraly
  struct PeerChunk selectedpairs[1];
127 0f35d029 Csaba Kiraly
  schedSelectPeerFirst(SCHED_WEIGHTED, peerps, n, chunkps, size, selectedpairs, &selectedpairs_len, needs, randomPeer, getChunkTimestamp);
128 4367dafd Csaba Kiraly
  /************ /USE SCHEDULER ****************/
129
130
  for (i=0; i<selectedpairs_len ; i++){
131 0f35d029 Csaba Kiraly
    struct peer *p = selectedpairs[i].peer;
132
    struct chunk *c = selectedpairs[i].chunk;
133
    dprintf("\t sending chunk[%d] to ", c->id);
134
    dprintf("%s\n", node_addr(p->id));
135 4367dafd Csaba Kiraly
136 0f35d029 Csaba Kiraly
    res = sendChunk(p->id, c);
137 4367dafd Csaba Kiraly
    dprintf("Result: %d\n", res);
138 92f230c7 Csaba Kiraly
    if (res>=0) {
139
      chunkID_set_add_chunk(p->bmap,c->id); //don't send twice ... assuming that it will actually arrive
140 0f35d029 Csaba Kiraly
    }
141 4367dafd Csaba Kiraly
  }
142 89e893e2 Luca
}