Statistics
| Branch: | Revision:

streamers / chunk_signaling.c @ 4a9d32c1

History | View | Annotate | Download (5.69 KB)

1
/*
2
 *  Copyright (c) 2009 Alessandro Russo.
3
 *
4
 *  This is free software;
5
 *  see GPL.txt
6
 *
7
 * Chunk Signaling API - Higher Abstraction
8
 *
9
 * The Chunk Signaling HA provides a set of primitives for chunks signaling negotiation with other peers, in order to collect information for the effective chunk exchange with other peers. <br>
10
 * This is a part of the Data Exchange Protocol which provides high level abstraction for chunks' negotiations, like requesting and proposing chunks.
11
 *
12
 */
13

    
14
#include <stdint.h>
15
#include <stdlib.h>
16
#include <stdio.h>
17
#include <sys/time.h>
18
#include <errno.h>
19
#include <assert.h>
20
#include <string.h>
21
#include "peer.h"
22
#include "peerset.h"
23
#include "chunkidset.h"
24
#include "trade_sig_la.h"
25
#include "chunk_signaling.h"
26
#include "msg_types.h"
27
#include "net_helper.h"
28

    
29
#include "dbg.h"
30

    
31
static struct nodeID *localID;
32
static struct peerset *pset;
33

    
34

    
35
struct peer *nodeid_to_peer(const struct nodeID* id, int reg)
36
{
37
  struct peer *p = peerset_get_peer(pset, id);
38
  if (!p) {
39
    fprintf(stderr,"warning: received message from unknown peer: %s!\n",node_addr(id));
40
    if (reg) {
41
      fprintf(stderr,"Adding %s to neighbourhood!\n", node_addr(id));
42
      peerset_add_peer(pset,id);
43
      p = peerset_get_peer(pset,id);
44
    }
45
  }
46

    
47
  return p;
48
}
49

    
50
int sendSignalling(int type, const struct nodeID *to_id, const struct nodeID *owner_id, struct chunkID_set *cset, int max_deliver, int cb_size, int trans_id)
51
{
52
    int buff_len, meta_len, msg_len, ret;
53
    uint8_t *buff;
54
    struct sig_nal *sigmex;
55
    uint8_t *meta;
56

    
57
    meta = malloc(1024);
58

    
59
    sigmex = (struct sig_nal*) meta;
60
    sigmex->type = type;
61
    sigmex->max_deliver = max_deliver;
62
    sigmex->cb_size = cb_size;
63
    sigmex->trans_id = trans_id;
64
    meta_len = sizeof(*sigmex)-1;
65
      sigmex->third_peer = 0;
66
    if (owner_id) {
67
      meta_len += nodeid_dump(&sigmex->third_peer, owner_id);
68
    }
69

    
70
    buff_len = 1 + chunkID_set_size(cset) * 4 + 12 + meta_len; // this should be enough
71
    buff = malloc(buff_len);
72
    if (!buff) {
73
      fprintf(stderr, "Error allocating buffer\n");
74
      return -1;
75
    }
76

    
77
    buff[0] = MSG_TYPE_SIGNALLING;
78
    msg_len = 1 + encodeChunkSignaling(cset, meta, meta_len, buff+1, buff_len-1);
79
    free(meta);
80
    if (msg_len < 0) {
81
      fprintf(stderr, "Error in encoding chunk set for sending a buffermap\n");
82
      ret = -1;
83
    } else {
84
      send_to_peer(localID, to_id, buff, msg_len);
85
    }
86
    ret = 1;
87
    free(buff);
88
    return ret;
89
}
90

    
91
/**
92
 * Send a BufferMap to a Peer.
93
 *
94
 * Send (our own or some other peer's) BufferMap to a third Peer.
95
 *
96
 * @param[in] to PeerID.
97
 * @param[in] owner Owner of the BufferMap to send.
98
 * @param[in] bmap the BufferMap to send.
99
 * @param[in] trans_id transaction number associated with this send
100
 * @return 0 on success, <0 on error
101
 */
102
int sendBufferMap(const struct nodeID *to_id, const struct nodeID *owner_id, struct chunkID_set *bmap, int cb_size, int trans_id) {
103
  return sendSignalling(MSG_SIG_BMOFF, to_id, owner_id, bmap, 0, cb_size, trans_id);
104
}
105

    
106
int sendMyBufferMap(const struct nodeID *to_id, struct chunkID_set *bmap, int cb_size, int trans_id)
107
{
108
  return sendBufferMap(to_id, localID, bmap, cb_size, trans_id);
109
}
110

    
111
int offerChunks(const struct nodeID *to_id, struct chunkID_set *cset, int max_deliver, int trans_id) {
112
  return sendSignalling(MSG_SIG_OFF, to_id, NULL, cset, max_deliver, -1, trans_id);
113
}
114

    
115
int acceptChunks(const struct nodeID *to_id, struct chunkID_set *cset, int max_deliver, int trans_id) {
116
  return sendSignalling(MSG_SIG_ACC, to_id, NULL, cset, max_deliver, -1, trans_id);
117
}
118

    
119

    
120
/// ==================== ///
121
///        RECEIVE       ///
122
/// ==================== ///
123

    
124
void bmap_received(const struct nodeID *fromid, const struct nodeID *ownerid, struct chunkID_set *c_set, int cb_size, int trans_id) {
125
  struct peer *owner = nodeid_to_peer(ownerid,1);
126
  if (owner) {        //now we have it almost sure
127
    chunkID_set_union(owner->bmap,c_set);        //don't send it back
128
    owner->cb_size = cb_size;
129
    gettimeofday(&owner->bmap_timestamp, NULL);
130
  }
131
}
132

    
133
 /**
134
 * Dispatcher for signaling messages.
135
 *
136
 * This method decodes the signaling messages, retrieving the set of chunk and the signaling
137
 * message, invoking the corresponding method.
138
 *
139
 * @param[in] buff buffer which contains the signaling message
140
 * @param[in] buff_len length of the buffer
141
 * @param[in] msgtype type of message in the buffer
142
 * @param[in] max_deliver deliver at most this number of Chunks
143
 * @param[in] arg parameters associated to the signaling message
144
 * @return 0 on success, <0 on error
145
 */
146

    
147
int sigParseData(const struct nodeID *fromid, uint8_t *buff, int buff_len) {
148
    struct chunkID_set *c_set;
149
    void *meta;
150
    int meta_len;
151
    struct sig_nal *signal;
152
    int sig;
153
    int ret = 1;
154
    dprintf("Decoding signaling message...");
155
    c_set = decodeChunkSignaling(&meta, &meta_len, buff+1, buff_len-1);
156
    dprintf(" SIG_HEADER: len: %d, of which meta: %d\n", buff_len, meta_len);
157
    if (!c_set) {
158
      fprintf(stdout, "ERROR decoding signaling message\n");
159
      return -1;
160
    }
161
    signal = (struct sig_nal *) meta;
162
    sig = (int) (signal->type);
163
    dprintf("\tSignaling Type %d\n", sig);
164
    //MaxDelivery  and Trans_Id to be defined
165
    switch (sig) {
166
        case MSG_SIG_BMOFF:
167
        {
168
          int dummy;
169
          struct nodeID *ownerid = nodeid_undump(&(signal->third_peer),&dummy);
170
          bmap_received(fromid, ownerid, c_set, signal->cb_size, signal->trans_id);
171
          free(ownerid);
172
          break;
173
        }
174
        default:
175
          ret = -1;
176
    }
177
    
178
    chunkID_set_clear(c_set,0);
179
    free(c_set);
180
    free(meta);
181
    return ret;
182
}
183

    
184
/// ==================== ///
185
///          INIT        ///
186
/// ==================== ///
187

    
188
int sigInit(struct nodeID *myID, struct peerset *ps)
189
{
190
  localID = myID;
191
  pset = ps;
192
  return 1;
193
}