Statistics
| Branch: | Revision:

streamers / chunk_signaling.c @ 913146ae

History | View | Annotate | Download (7.42 KB)

1 fa655fab Csaba Kiraly
/*
2 7f591208 Csaba Kiraly
 *  Copyright (c) 2009 Alessandro Russo
3
 *  Copyright (c) 2009 Csaba Kiraly
4 fa655fab Csaba Kiraly
 *
5 7f591208 Csaba Kiraly
 *  This is free software; see gpl-3.0.txt
6 fa655fab Csaba Kiraly
 *
7
 * Chunk Signaling API - Higher Abstraction
8
 *
9
 * The Chunk Signaling HA provides a set of primitives for chunks signaling negotiation with other peers, in order to collect information for the effective chunk exchange with other peers. <br>
10
 * This is a part of the Data Exchange Protocol which provides high level abstraction for chunks' negotiations, like requesting and proposing chunks.
11
 *
12
 */
13 7d19f599 Csaba Kiraly
#include <stdint.h>
14
#include <stdlib.h>
15
#include <stdio.h>
16 c6575994 Csaba Kiraly
#include <sys/time.h>
17 fa655fab Csaba Kiraly
#include <errno.h>
18
#include <assert.h>
19 637f87e0 Csaba Kiraly
#include <string.h>
20 23442a5b ArpadBakay
21 fa655fab Csaba Kiraly
#include "peer.h"
22 7d19f599 Csaba Kiraly
#include "peerset.h"
23 fa655fab Csaba Kiraly
#include "chunkidset.h"
24 23442a5b ArpadBakay
#include "trade_sig_ha.h"
25 7d19f599 Csaba Kiraly
#include "trade_sig_la.h"
26 fa655fab Csaba Kiraly
#include "msg_types.h"
27 7d19f599 Csaba Kiraly
#include "net_helper.h"
28 fa655fab Csaba Kiraly
29 23442a5b ArpadBakay
#include "chunk_signaling.h"
30
31 2067b4af Csaba Kiraly
#include "streaming.h"
32 fcb5c29b Csaba Kiraly
#include "topology.h"
33 c1f8de1c Csaba Kiraly
#include "dbg.h"
34
35 7d19f599 Csaba Kiraly
static struct nodeID *localID;
36 b865a917 Alessandro Russo
/*
37 1bb397ce Csaba Kiraly

38 4a9d32c1 Csaba Kiraly
int sendSignalling(int type, const struct nodeID *to_id, const struct nodeID *owner_id, struct chunkID_set *cset, int max_deliver, int cb_size, int trans_id)
39 1942bd59 Csaba Kiraly
{
40 637f87e0 Csaba Kiraly
    int buff_len, meta_len, msg_len, ret;
41 1942bd59 Csaba Kiraly
    uint8_t *buff;
42 637f87e0 Csaba Kiraly
    struct sig_nal *sigmex;
43
    uint8_t *meta;
44

45
    meta = malloc(1024);
46

47
    sigmex = (struct sig_nal*) meta;
48
    sigmex->type = type;
49
    sigmex->max_deliver = max_deliver;
50 4a9d32c1 Csaba Kiraly
    sigmex->cb_size = cb_size;
51 637f87e0 Csaba Kiraly
    sigmex->trans_id = trans_id;
52
    meta_len = sizeof(*sigmex)-1;
53
      sigmex->third_peer = 0;
54 1942bd59 Csaba Kiraly
    if (owner_id) {
55 637f87e0 Csaba Kiraly
      meta_len += nodeid_dump(&sigmex->third_peer, owner_id);
56 1942bd59 Csaba Kiraly
    }
57 637f87e0 Csaba Kiraly

58 ed76b342 Csaba Kiraly
    buff_len = 1 + chunkID_set_size(cset) * 4 + 16 + meta_len; // this should be enough
59 1942bd59 Csaba Kiraly
    buff = malloc(buff_len);
60
    if (!buff) {
61 637f87e0 Csaba Kiraly
      fprintf(stderr, "Error allocating buffer\n");
62 1942bd59 Csaba Kiraly
      return -1;
63
    }
64 637f87e0 Csaba Kiraly

65 1942bd59 Csaba Kiraly
    buff[0] = MSG_TYPE_SIGNALLING;
66 637f87e0 Csaba Kiraly
    msg_len = 1 + encodeChunkSignaling(cset, meta, meta_len, buff+1, buff_len-1);
67
    free(meta);
68 d35ceae7 Csaba Kiraly
    if (msg_len <= 0) {
69 1942bd59 Csaba Kiraly
      fprintf(stderr, "Error in encoding chunk set for sending a buffermap\n");
70
      ret = -1;
71
    } else {
72
      send_to_peer(localID, to_id, buff, msg_len);
73
    }
74
    ret = 1;
75
    free(buff);
76
    return ret;
77 b865a917 Alessandro Russo
}*/
78 1942bd59 Csaba Kiraly
79
/**
80
 * Send a BufferMap to a Peer.
81
 *
82
 * Send (our own or some other peer's) BufferMap to a third Peer.
83
 *
84
 * @param[in] to PeerID.
85
 * @param[in] owner Owner of the BufferMap to send.
86
 * @param[in] bmap the BufferMap to send.
87
 * @param[in] trans_id transaction number associated with this send
88
 * @return 0 on success, <0 on error
89
 */
90 b865a917 Alessandro Russo
/*
91 4a9d32c1 Csaba Kiraly
int sendBufferMap(const struct nodeID *to_id, const struct nodeID *owner_id, struct chunkID_set *bmap, int cb_size, int trans_id) {
92
  return sendSignalling(MSG_SIG_BMOFF, to_id, owner_id, bmap, 0, cb_size, trans_id);
93 1942bd59 Csaba Kiraly
}
94

95 4a9d32c1 Csaba Kiraly
int sendMyBufferMap(const struct nodeID *to_id, struct chunkID_set *bmap, int cb_size, int trans_id)
96 1942bd59 Csaba Kiraly
{
97 4a9d32c1 Csaba Kiraly
  return sendBufferMap(to_id, localID, bmap, cb_size, trans_id);
98 1942bd59 Csaba Kiraly
}
99 b865a917 Alessandro Russo
*/
100 1942bd59 Csaba Kiraly
101 62e3c01d Alessandro Russo
/*
102 1942bd59 Csaba Kiraly
int offerChunks(const struct nodeID *to_id, struct chunkID_set *cset, int max_deliver, int trans_id) {
103 4a9d32c1 Csaba Kiraly
  return sendSignalling(MSG_SIG_OFF, to_id, NULL, cset, max_deliver, -1, trans_id);
104 1942bd59 Csaba Kiraly
}
105 62e3c01d Alessandro Russo
*/
106 d029edd1 Alessandro Russo
/*
107 1942bd59 Csaba Kiraly
int acceptChunks(const struct nodeID *to_id, struct chunkID_set *cset, int max_deliver, int trans_id) {
108 4a9d32c1 Csaba Kiraly
  return sendSignalling(MSG_SIG_ACC, to_id, NULL, cset, max_deliver, -1, trans_id);
109 1942bd59 Csaba Kiraly
}
110 d029edd1 Alessandro Russo
*/
111 1942bd59 Csaba Kiraly
112
/// ==================== ///
113
///        RECEIVE       ///
114
/// ==================== ///
115
116 4a9d32c1 Csaba Kiraly
void bmap_received(const struct nodeID *fromid, const struct nodeID *ownerid, struct chunkID_set *c_set, int cb_size, int trans_id) {
117 242021e6 Csaba Kiraly
  struct peer *owner;
118
  if (nodeid_equal(fromid, ownerid)) {
119
    owner = nodeid_to_peer(ownerid,1);
120
  } else {
121
    dprintf("%s might be behind ",node_addr(ownerid));
122
    dprintf("NAT:%s\n",node_addr(fromid));
123
    owner = nodeid_to_peer(fromid,1);
124
  }
125
  
126 1bb397ce Csaba Kiraly
  if (owner) {        //now we have it almost sure
127 718a8f29 Csaba Kiraly
    chunkID_set_clear(owner->bmap,cb_size+5);        //TODO: some better solution might be needed to keep info about chunks we sent in flight.
128
    chunkID_set_union(owner->bmap,c_set);
129 4a9d32c1 Csaba Kiraly
    owner->cb_size = cb_size;
130 1bb397ce Csaba Kiraly
    gettimeofday(&owner->bmap_timestamp, NULL);
131
  }
132
}
133
134 2067b4af Csaba Kiraly
void offer_received(const struct nodeID *fromid, struct chunkID_set *cset, int max_deliver, int trans_id) {
135
  struct peer *from = nodeid_to_peer(fromid,1);
136
  dprintf("The peer %s offers %d chunks, max deliver %d.\n", node_addr(fromid), chunkID_set_size(cset), max_deliver);
137
138
  if (from) {
139
    struct chunkID_set *cset_acc;
140
    int max_deliver2;
141
142 526c3729 Csaba Kiraly
    //register these chunks in the buffermap. Warning: this should be changed when offers become selective.
143
    chunkID_set_clear(from->bmap,0);        //TODO: some better solution might be needed to keep info about chunks we sent in flight.
144 2067b4af Csaba Kiraly
    chunkID_set_union(from->bmap,cset);
145
    gettimeofday(&from->bmap_timestamp, NULL);
146
147
    //decide what to accept
148 b5a5780a Csaba Kiraly
    cset_acc = get_chunks_to_accept(from, cset, max_deliver, trans_id);
149 2067b4af Csaba Kiraly
150
    //send accept message
151 652fb405 Csaba Kiraly
    dprintf("\t accept %d chunks from peer %s, trans_id %d\n", chunkID_set_size(cset_acc), node_addr(from->id), trans_id);
152 2067b4af Csaba Kiraly
    max_deliver2 = chunkID_set_size(cset_acc);
153
    acceptChunks(fromid, cset_acc, max_deliver2, trans_id);
154
155 ddedf85f Csaba Kiraly
    chunkID_set_free(cset_acc);
156 2067b4af Csaba Kiraly
  }
157
}
158
159
void accept_received(const struct nodeID *fromid, struct chunkID_set *cset, int max_deliver, int trans_id) {
160
  struct peer *from = nodeid_to_peer(fromid,0);   //verify that we have really offered, 0 at least garantees that we've known the peer before
161
  dprintf("The peer %s accepted our offer for %d chunks, max deliver %d.\n", node_addr(fromid), chunkID_set_size(cset), max_deliver);
162
163
  if (from) {
164 b5a5780a Csaba Kiraly
    send_accepted_chunks(from, cset, max_deliver, trans_id);
165 2067b4af Csaba Kiraly
  }
166
}
167
168
169 fa655fab Csaba Kiraly
 /**
170
 * Dispatcher for signaling messages.
171
 *
172
 * This method decodes the signaling messages, retrieving the set of chunk and the signaling
173
 * message, invoking the corresponding method.
174
 *
175
 * @param[in] buff buffer which contains the signaling message
176
 * @param[in] buff_len length of the buffer
177
 * @param[in] msgtype type of message in the buffer
178
 * @param[in] max_deliver deliver at most this number of Chunks
179
 * @param[in] arg parameters associated to the signaling message
180
 * @return 0 on success, <0 on error
181
 */
182
183 1bb397ce Csaba Kiraly
int sigParseData(const struct nodeID *fromid, uint8_t *buff, int buff_len) {
184 fa655fab Csaba Kiraly
    struct chunkID_set *c_set;
185
    void *meta;
186
    int meta_len;
187 7d19f599 Csaba Kiraly
    struct sig_nal *signal;
188
    int sig;
189 0c3f7376 Csaba Kiraly
    int ret = 1;
190 13d85fc6 Csaba Kiraly
    dprintf("Decoding signaling message...");
191 c1f8de1c Csaba Kiraly
    c_set = decodeChunkSignaling(&meta, &meta_len, buff+1, buff_len-1);
192 13d85fc6 Csaba Kiraly
    dprintf(" SIG_HEADER: len: %d, of which meta: %d\n", buff_len, meta_len);
193 c1f8de1c Csaba Kiraly
    if (!c_set) {
194
      fprintf(stdout, "ERROR decoding signaling message\n");
195
      return -1;
196
    }
197 7ad9ec5e Csaba Kiraly
    signal = (struct sig_nal *) meta;
198
    sig = (int) (signal->type);
199 993f958a Csaba Kiraly
    dprintf("\tSignaling Type %d\n", sig);
200 fa655fab Csaba Kiraly
    //MaxDelivery  and Trans_Id to be defined
201
    switch (sig) {
202
        case MSG_SIG_BMOFF:
203 b7374def Csaba Kiraly
        {
204 7d19f599 Csaba Kiraly
          int dummy;
205 23442a5b ArpadBakay
          const struct nodeID *ownerid = nodeid_undump(&(signal->third_peer),&dummy);
206 4a9d32c1 Csaba Kiraly
          bmap_received(fromid, ownerid, c_set, signal->cb_size, signal->trans_id);
207 4888d47e Luca Abeni
          nodeid_free(ownerid);
208 b7374def Csaba Kiraly
          break;
209 fa655fab Csaba Kiraly
        }
210 2067b4af Csaba Kiraly
        case MSG_SIG_OFF:
211
          offer_received(fromid, c_set, signal->max_deliver, signal->trans_id);
212
          break;
213
        case MSG_SIG_ACC:
214
          accept_received(fromid, c_set, signal->max_deliver, signal->trans_id);
215
          break;
216 fa655fab Csaba Kiraly
        default:
217 0c3f7376 Csaba Kiraly
          ret = -1;
218 fa655fab Csaba Kiraly
    }
219 0c3f7376 Csaba Kiraly
    
220 c8c4c779 Csaba Kiraly
    chunkID_set_free(c_set);
221 0c3f7376 Csaba Kiraly
    free(meta);
222
    return ret;
223 fa655fab Csaba Kiraly
}
224
225 1942bd59 Csaba Kiraly
/// ==================== ///
226
///          INIT        ///
227
/// ==================== ///
228 a0192c66 Csaba Kiraly
229 fcb5c29b Csaba Kiraly
int sigInit(struct nodeID *myID)
230 7d19f599 Csaba Kiraly
{
231
  localID = myID;
232
  return 1;
233 fa655fab Csaba Kiraly
}