pstreamer / src / chunk_signaling.c @ fe735c05
History | View | Annotate | Download (6.45 KB)
1 |
/*
|
---|---|
2 |
* Copyright (c) 2009 Alessandro Russo
|
3 |
* Copyright (c) 2009 Csaba Kiraly
|
4 |
* Copyright (c) 2013-2017 Luca Baldesi
|
5 |
*
|
6 |
* This file is part of PeerStreamer.
|
7 |
*
|
8 |
* PeerStreamer is free software: you can redistribute it and/or
|
9 |
* modify it under the terms of the GNU Affero General Public License as
|
10 |
* published by the Free Software Foundation, either version 3 of the
|
11 |
* License, or (at your option) any later version.
|
12 |
*
|
13 |
* PeerStreamer is distributed in the hope that it will be useful,
|
14 |
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
15 |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
|
16 |
* General Public License for more details.
|
17 |
*
|
18 |
* You should have received a copy of the GNU Affero General Public License
|
19 |
* along with PeerStreamer. If not, see <http://www.gnu.org/licenses/>.
|
20 |
*
|
21 |
*/
|
22 |
/*
|
23 |
* Chunk Signaling API - Higher Abstraction
|
24 |
*
|
25 |
* The Chunk Signaling HA provides a set of primitives for chunks signaling negotiation with other peers, in order to collect information for the effective chunk exchange with other peers. <br>
|
26 |
* This is a part of the Data Exchange Protocol which provides high level abstraction for chunks' negotiations, like requesting and proposing chunks.
|
27 |
*
|
28 |
*/
|
29 |
#include <stdint.h> |
30 |
#include <stdlib.h> |
31 |
#include <stdio.h> |
32 |
#include <sys/time.h> |
33 |
#include <errno.h> |
34 |
#include <assert.h> |
35 |
#include <string.h> |
36 |
#include "peer.h" |
37 |
#include "peerset.h" |
38 |
#include "chunkidset.h" |
39 |
#include "trade_sig_la.h" |
40 |
#include "chunk_signaling.h" |
41 |
#include "net_helper.h" |
42 |
#include <trade_sig_ha.h> |
43 |
|
44 |
#include "streaming.h" |
45 |
#include "topology.h" |
46 |
// #include "ratecontrol.h"
|
47 |
#include "dbg.h" |
48 |
#include "net_helpers.h" |
49 |
|
50 |
#define neigh_on_sign_recv false |
51 |
|
52 |
void ack_received(const struct psinstance * ps, struct nodeID *fromid, struct chunkID_set *cset, int max_deliver, uint16_t trans_id) { |
53 |
struct peer *from = nodeid_to_peer(psinstance_topology(ps), fromid,0); //verify that we have really sent, 0 at least garantees that we've known the peer before |
54 |
dprintf("The peer %s acked our chunk, max deliver %d, trans_id %d.\n", nodeid_static_str(fromid), max_deliver, trans_id);
|
55 |
|
56 |
if (from) {
|
57 |
chunkID_set_clear(from->bmap,0); //TODO: some better solution might be needed to keep info about chunks we sent in flight. |
58 |
chunkID_set_union(from->bmap,cset); |
59 |
gettimeofday(&from->bmap_timestamp, NULL);
|
60 |
} |
61 |
|
62 |
// rc_reg_ack(trans_id);
|
63 |
} |
64 |
|
65 |
void bmap_received(const struct psinstance * ps, struct nodeID *fromid, struct nodeID *ownerid, struct chunkID_set *c_set, int cb_size, uint16_t trans_id) { |
66 |
struct peer *owner;
|
67 |
if (nodeid_equal(fromid, ownerid)) {
|
68 |
owner = nodeid_to_peer(psinstance_topology(ps), ownerid, neigh_on_sign_recv); |
69 |
} else {
|
70 |
dprintf("%s might be behind ",nodeid_static_str(ownerid));
|
71 |
dprintf("NAT:%s\n",nodeid_static_str(fromid));
|
72 |
owner = nodeid_to_peer(psinstance_topology(ps), fromid, neigh_on_sign_recv); |
73 |
} |
74 |
|
75 |
if (owner) { //now we have it almost sure |
76 |
chunkID_set_clear(owner->bmap,0); //TODO: some better solution might be needed to keep info about chunks we sent in flight. |
77 |
chunkID_set_union(owner->bmap,c_set); |
78 |
owner->cb_size = cb_size; |
79 |
gettimeofday(&owner->bmap_timestamp, NULL);
|
80 |
} |
81 |
} |
82 |
|
83 |
void offer_received(const struct psinstance * ps, struct nodeID *fromid, struct chunkID_set *cset, int max_deliver, uint16_t trans_id) { |
84 |
struct chunkID_set *cset_acc;
|
85 |
|
86 |
struct peer *from = nodeid_to_peer(psinstance_topology(ps), fromid, neigh_on_sign_recv);
|
87 |
dprintf("The peer %s offers %d chunks, max deliver %d.\n", nodeid_static_str(fromid), chunkID_set_size(cset), max_deliver);
|
88 |
|
89 |
if (from) {
|
90 |
//register these chunks in the buffermap. Warning: this should be changed when offers become selective.
|
91 |
chunkID_set_clear(from->bmap,0); //TODO: some better solution might be needed to keep info about chunks we sent in flight. |
92 |
chunkID_set_union(from->bmap,cset); |
93 |
gettimeofday(&from->bmap_timestamp, NULL);
|
94 |
} |
95 |
|
96 |
//decide what to accept
|
97 |
cset_acc = get_chunks_to_accept(psinstance_streaming(ps), fromid, cset, max_deliver, trans_id); |
98 |
|
99 |
//send accept message
|
100 |
dprintf("\t accept %d chunks from peer %s, trans_id %d\n", chunkID_set_size(cset_acc), nodeid_static_str(fromid), trans_id);
|
101 |
acceptChunks(psinstance_nodeid(ps), fromid, cset_acc, trans_id); |
102 |
|
103 |
chunkID_set_free(cset_acc); |
104 |
} |
105 |
|
106 |
void accept_received(const struct psinstance * ps, struct nodeID *fromid, struct chunkID_set *cset, int max_deliver, uint16_t trans_id) { |
107 |
struct peer *from = nodeid_to_peer(psinstance_topology(ps), fromid,0); //verify that we have really offered, 0 at least garantees that we've known the peer before |
108 |
|
109 |
dprintf("The peer %s accepted our offer for %d chunks, max deliver %d.\n", nodeid_static_str(fromid), chunkID_set_size(cset), max_deliver);
|
110 |
|
111 |
if (from) {
|
112 |
gettimeofday(&from->bmap_timestamp, NULL);
|
113 |
} |
114 |
|
115 |
// rc_reg_accept(trans_id, chunkID_set_size(cset));
|
116 |
|
117 |
send_accepted_chunks(psinstance_streaming(ps), fromid, cset, max_deliver, trans_id); |
118 |
} |
119 |
|
120 |
|
121 |
/**
|
122 |
* Dispatcher for signaling messages.
|
123 |
*
|
124 |
* This method decodes the signaling messages, retrieving the set of chunk and the signaling
|
125 |
* message, invoking the corresponding method.
|
126 |
*
|
127 |
* @param[in] buff buffer which contains the signaling message
|
128 |
* @param[in] buff_len length of the buffer
|
129 |
* @return 0 on success, <0 on error
|
130 |
*/
|
131 |
|
132 |
int sigParseData(const struct psinstance * ps, struct nodeID *fromid, uint8_t *buff, int buff_len) { |
133 |
struct chunkID_set *c_set;
|
134 |
struct nodeID *ownerid;
|
135 |
enum signaling_type sig_type;
|
136 |
int max_deliver = 0; |
137 |
uint16_t trans_id = 0;
|
138 |
int ret = 1; |
139 |
dprintf("Decoding signaling message...\n");
|
140 |
|
141 |
ret = parseSignaling(buff + 1, buff_len-1, &ownerid, &c_set, &max_deliver, &trans_id, &sig_type); |
142 |
#ifdef LOG_SIGNAL
|
143 |
log_signal(fromid,psinstance_nodeid(ps),chunkID_set_size(c_set),trans_id,sig_type,"RECEIVED");
|
144 |
#endif
|
145 |
|
146 |
if (ret < 0) { |
147 |
fprintf(stdout, "ERROR parsing signaling message\n");
|
148 |
return -1; |
149 |
} |
150 |
switch (sig_type) {
|
151 |
case sig_send_buffermap:
|
152 |
bmap_received(ps, fromid, ownerid, c_set, max_deliver, trans_id); //FIXME: cb_size has gone from signaling
|
153 |
break;
|
154 |
case sig_offer:
|
155 |
offer_received(ps, fromid, c_set, max_deliver, trans_id); |
156 |
break;
|
157 |
case sig_accept:
|
158 |
accept_received(ps, fromid, c_set, chunkID_set_size(c_set), trans_id); |
159 |
break;
|
160 |
case sig_ack:
|
161 |
ack_received(ps, fromid, c_set, chunkID_set_size(c_set), trans_id); |
162 |
break;
|
163 |
default:
|
164 |
ret = -1;
|
165 |
} |
166 |
chunkID_set_free(c_set); |
167 |
nodeid_free(ownerid); |
168 |
return ret;
|
169 |
} |