Statistics
| Branch: | Revision:

streamers / chunk_signaling.c @ c8c4c779

History | View | Annotate | Download (7.54 KB)

1
/*
2
 *  Copyright (c) 2009 Alessandro Russo.
3
 *
4
 *  This is free software;
5
 *  see GPL.txt
6
 *
7
 * Chunk Signaling API - Higher Abstraction
8
 *
9
 * The Chunk Signaling HA provides a set of primitives for chunks signaling negotiation with other peers, in order to collect information for the effective chunk exchange with other peers. <br>
10
 * This is a part of the Data Exchange Protocol which provides high level abstraction for chunks' negotiations, like requesting and proposing chunks.
11
 *
12
 */
13

    
14
#include <stdint.h>
15
#include <stdlib.h>
16
#include <stdio.h>
17
#include <sys/time.h>
18
#include <errno.h>
19
#include <assert.h>
20
#include <string.h>
21
#include "peer.h"
22
#include "peerset.h"
23
#include "chunkidset.h"
24
#include "trade_sig_la.h"
25
#include "chunk_signaling.h"
26
#include "msg_types.h"
27
#include "net_helper.h"
28

    
29
#include "streaming.h"
30
#include "dbg.h"
31

    
32
static struct nodeID *localID;
33
static struct peerset *pset;
34

    
35

    
36
struct peer *nodeid_to_peer(const struct nodeID* id, int reg)
37
{
38
  struct peer *p = peerset_get_peer(pset, id);
39
  if (!p) {
40
    fprintf(stderr,"warning: received message from unknown peer: %s!\n",node_addr(id));
41
    if (reg) {
42
      fprintf(stderr,"Adding %s to neighbourhood!\n", node_addr(id));
43
      peerset_add_peer(pset,id);
44
      p = peerset_get_peer(pset,id);
45
    }
46
  }
47

    
48
  return p;
49
}
50

    
51
int sendSignalling(int type, const struct nodeID *to_id, const struct nodeID *owner_id, struct chunkID_set *cset, int max_deliver, int cb_size, int trans_id)
52
{
53
    int buff_len, meta_len, msg_len, ret;
54
    uint8_t *buff;
55
    struct sig_nal *sigmex;
56
    uint8_t *meta;
57

    
58
    meta = malloc(1024);
59

    
60
    sigmex = (struct sig_nal*) meta;
61
    sigmex->type = type;
62
    sigmex->max_deliver = max_deliver;
63
    sigmex->cb_size = cb_size;
64
    sigmex->trans_id = trans_id;
65
    meta_len = sizeof(*sigmex)-1;
66
      sigmex->third_peer = 0;
67
    if (owner_id) {
68
      meta_len += nodeid_dump(&sigmex->third_peer, owner_id);
69
    }
70

    
71
    buff_len = 1 + chunkID_set_size(cset) * 4 + 12 + meta_len; // this should be enough
72
    buff = malloc(buff_len);
73
    if (!buff) {
74
      fprintf(stderr, "Error allocating buffer\n");
75
      return -1;
76
    }
77

    
78
    buff[0] = MSG_TYPE_SIGNALLING;
79
    msg_len = 1 + encodeChunkSignaling(cset, meta, meta_len, buff+1, buff_len-1);
80
    free(meta);
81
    if (msg_len < 0) {
82
      fprintf(stderr, "Error in encoding chunk set for sending a buffermap\n");
83
      ret = -1;
84
    } else {
85
      send_to_peer(localID, to_id, buff, msg_len);
86
    }
87
    ret = 1;
88
    free(buff);
89
    return ret;
90
}
91

    
92
/**
93
 * Send a BufferMap to a Peer.
94
 *
95
 * Send (our own or some other peer's) BufferMap to a third Peer.
96
 *
97
 * @param[in] to PeerID.
98
 * @param[in] owner Owner of the BufferMap to send.
99
 * @param[in] bmap the BufferMap to send.
100
 * @param[in] trans_id transaction number associated with this send
101
 * @return 0 on success, <0 on error
102
 */
103
int sendBufferMap(const struct nodeID *to_id, const struct nodeID *owner_id, struct chunkID_set *bmap, int cb_size, int trans_id) {
104
  return sendSignalling(MSG_SIG_BMOFF, to_id, owner_id, bmap, 0, cb_size, trans_id);
105
}
106

    
107
int sendMyBufferMap(const struct nodeID *to_id, struct chunkID_set *bmap, int cb_size, int trans_id)
108
{
109
  return sendBufferMap(to_id, localID, bmap, cb_size, trans_id);
110
}
111

    
112
int offerChunks(const struct nodeID *to_id, struct chunkID_set *cset, int max_deliver, int trans_id) {
113
  return sendSignalling(MSG_SIG_OFF, to_id, NULL, cset, max_deliver, -1, trans_id);
114
}
115

    
116
int acceptChunks(const struct nodeID *to_id, struct chunkID_set *cset, int max_deliver, int trans_id) {
117
  return sendSignalling(MSG_SIG_ACC, to_id, NULL, cset, max_deliver, -1, trans_id);
118
}
119

    
120

    
121
/// ==================== ///
122
///        RECEIVE       ///
123
/// ==================== ///
124

    
125
void bmap_received(const struct nodeID *fromid, const struct nodeID *ownerid, struct chunkID_set *c_set, int cb_size, int trans_id) {
126
  struct peer *owner;
127
  if (nodeid_equal(fromid, ownerid)) {
128
    owner = nodeid_to_peer(ownerid,1);
129
  } else {
130
    dprintf("%s might be behind ",node_addr(ownerid));
131
    dprintf("NAT:%s\n",node_addr(fromid));
132
    owner = nodeid_to_peer(fromid,1);
133
  }
134
  
135
  if (owner) {        //now we have it almost sure
136
    chunkID_set_clear(owner->bmap,cb_size+5);        //TODO: some better solution might be needed to keep info about chunks we sent in flight.
137
    chunkID_set_union(owner->bmap,c_set);
138
    owner->cb_size = cb_size;
139
    gettimeofday(&owner->bmap_timestamp, NULL);
140
  }
141
}
142

    
143
void offer_received(const struct nodeID *fromid, struct chunkID_set *cset, int max_deliver, int trans_id) {
144
  struct peer *from = nodeid_to_peer(fromid,1);
145
  dprintf("The peer %s offers %d chunks, max deliver %d.\n", node_addr(fromid), chunkID_set_size(cset), max_deliver);
146

    
147
  if (from) {
148
    struct chunkID_set *cset_acc;
149
    int max_deliver2;
150

    
151
    //register these chunks in the buffermap
152
    chunkID_set_union(from->bmap,cset);
153
    gettimeofday(&from->bmap_timestamp, NULL);
154

    
155
    //decide what to accept
156
    cset_acc = get_chunks_to_accept(from, cset, max_deliver);
157

    
158
    //send accept message
159
    dprintf("\t accept %d chunks from peer %s, trans_id %d\n", chunkID_set_size(cset_acc), node_addr(from->id), trans_id);
160
    max_deliver2 = chunkID_set_size(cset_acc);
161
    acceptChunks(fromid, cset_acc, max_deliver2, trans_id);
162

    
163
    //@TODO: free cset_acc
164
  }
165
}
166

    
167
void accept_received(const struct nodeID *fromid, struct chunkID_set *cset, int max_deliver, int trans_id) {
168
  struct peer *from = nodeid_to_peer(fromid,0);   //verify that we have really offered, 0 at least garantees that we've known the peer before
169
  dprintf("The peer %s accepted our offer for %d chunks, max deliver %d.\n", node_addr(fromid), chunkID_set_size(cset), max_deliver);
170

    
171
  if (from) {
172
    send_accepted_chunks(from, cset, max_deliver);
173
  }
174
}
175

    
176

    
177
 /**
178
 * Dispatcher for signaling messages.
179
 *
180
 * This method decodes the signaling messages, retrieving the set of chunk and the signaling
181
 * message, invoking the corresponding method.
182
 *
183
 * @param[in] buff buffer which contains the signaling message
184
 * @param[in] buff_len length of the buffer
185
 * @param[in] msgtype type of message in the buffer
186
 * @param[in] max_deliver deliver at most this number of Chunks
187
 * @param[in] arg parameters associated to the signaling message
188
 * @return 0 on success, <0 on error
189
 */
190

    
191
int sigParseData(const struct nodeID *fromid, uint8_t *buff, int buff_len) {
192
    struct chunkID_set *c_set;
193
    void *meta;
194
    int meta_len;
195
    struct sig_nal *signal;
196
    int sig;
197
    int ret = 1;
198
    dprintf("Decoding signaling message...");
199
    c_set = decodeChunkSignaling(&meta, &meta_len, buff+1, buff_len-1);
200
    dprintf(" SIG_HEADER: len: %d, of which meta: %d\n", buff_len, meta_len);
201
    if (!c_set) {
202
      fprintf(stdout, "ERROR decoding signaling message\n");
203
      return -1;
204
    }
205
    signal = (struct sig_nal *) meta;
206
    sig = (int) (signal->type);
207
    dprintf("\tSignaling Type %d\n", sig);
208
    //MaxDelivery  and Trans_Id to be defined
209
    switch (sig) {
210
        case MSG_SIG_BMOFF:
211
        {
212
          int dummy;
213
          struct nodeID *ownerid = nodeid_undump(&(signal->third_peer),&dummy);
214
          bmap_received(fromid, ownerid, c_set, signal->cb_size, signal->trans_id);
215
          nodeID_free(ownerid);
216
          break;
217
        }
218
        case MSG_SIG_OFF:
219
          offer_received(fromid, c_set, signal->max_deliver, signal->trans_id);
220
          break;
221
        case MSG_SIG_ACC:
222
          accept_received(fromid, c_set, signal->max_deliver, signal->trans_id);
223
          break;
224
        default:
225
          ret = -1;
226
    }
227
    
228
    chunkID_set_free(c_set);
229
    free(meta);
230
    return ret;
231
}
232

    
233
/// ==================== ///
234
///          INIT        ///
235
/// ==================== ///
236

    
237
int sigInit(struct nodeID *myID, struct peerset *ps)
238
{
239
  localID = myID;
240
  pset = ps;
241
  return 1;
242
}