Statistics
| Branch: | Revision:

streamers / chunk_signaling.c @ 1942bd59

History | View | Annotate | Download (5.27 KB)

1
/*
2
 *  Copyright (c) 2009 Alessandro Russo.
3
 *
4
 *  This is free software;
5
 *  see GPL.txt
6
 *
7
 * Chunk Signaling API - Higher Abstraction
8
 *
9
 * The Chunk Signaling HA provides a set of primitives for chunks signaling negotiation with other peers, in order to collect information for the effective chunk exchange with other peers. <br>
10
 * This is a part of the Data Exchange Protocol which provides high level abstraction for chunks' negotiations, like requesting and proposing chunks.
11
 *
12
 */
13

    
14
#include <stdint.h>
15
#include <stdlib.h>
16
#include <stdio.h>
17
#include <sys/time.h>
18
#include <errno.h>
19
#include <assert.h>
20
#include "peer.h"
21
#include "peerset.h"
22
#include "chunkidset.h"
23
#include "trade_sig_la.h"
24
#include "chunk_signaling.h"
25
#include "msg_types.h"
26
#include "net_helper.h"
27

    
28
#include "dbg.h"
29

    
30
static struct nodeID *localID;
31
static struct peerset *pset;
32

    
33

    
34
struct peer *nodeid_to_peer(const struct nodeID* id, int reg)
35
{
36
  struct peer *p = peerset_get_peer(pset, id);
37
  if (!p) {
38
    fprintf(stderr,"warning: received message from unknown peer: %s!\n",node_addr(id));
39
    if (reg) {
40
      fprintf(stderr,"Adding %s to neighbourhood!\n", node_addr(id));
41
      peerset_add_peer(pset,id);
42
      p = peerset_get_peer(pset,id);
43
    }
44
  }
45

    
46
  return p;
47
}
48

    
49
int sendSignalling(int type, const struct nodeID *to_id, const struct nodeID *owner_id, struct chunkID_set *cset, int max_deliver, int trans_id)
50
{
51
    int buff_len, id_len, msg_len, ret;
52
    uint8_t *buff;
53
    struct sig_nal sigmex;
54
    sigmex.type = type;
55
    sigmex.max_deliver = max_deliver;
56
    sigmex.trans_id = trans_id;
57
    if (owner_id) {
58
      id_len = nodeid_dump(&sigmex.third_peer, owner_id);
59
    } else {
60
      id_len = 1;
61
    }
62
    buff_len = 1 + chunkID_set_size(cset) * 4 + 12 + sizeof(sigmex)-1 + id_len; // this should be enough
63
    buff = malloc(buff_len);
64
    if (!buff) {
65
      return -1;
66
    }
67
    buff[0] = MSG_TYPE_SIGNALLING;
68
    msg_len = 1 + encodeChunkSignaling(cset, &sigmex, sizeof(sigmex)-1 + id_len, buff+1, buff_len-1);
69
    if (msg_len < 0) {
70
      fprintf(stderr, "Error in encoding chunk set for sending a buffermap\n");
71
      ret = -1;
72
    } else {
73
      send_to_peer(localID, to_id, buff, msg_len);
74
    }
75
    ret = 1;
76
    free(buff);
77
    return ret;
78
}
79

    
80
/**
81
 * Send a BufferMap to a Peer.
82
 *
83
 * Send (our own or some other peer's) BufferMap to a third Peer.
84
 *
85
 * @param[in] to PeerID.
86
 * @param[in] owner Owner of the BufferMap to send.
87
 * @param[in] bmap the BufferMap to send.
88
 * @param[in] trans_id transaction number associated with this send
89
 * @return 0 on success, <0 on error
90
 */
91
int sendBufferMap(const struct nodeID *to_id, const struct nodeID *owner_id, struct chunkID_set *bmap, int trans_id) {
92
  return sendSignalling(MSG_SIG_BMOFF, to_id, owner_id, bmap, 0, trans_id);
93
}
94

    
95
int sendMyBufferMap(const struct nodeID *to_id, struct chunkID_set *bmap, int trans_id)
96
{
97
  return sendBufferMap(to_id, localID, bmap, trans_id);
98
}
99

    
100
int offerChunks(const struct nodeID *to_id, struct chunkID_set *cset, int max_deliver, int trans_id) {
101
  return sendSignalling(MSG_SIG_OFF, to_id, NULL, cset, max_deliver, trans_id);
102
}
103

    
104
int acceptChunks(const struct nodeID *to_id, struct chunkID_set *cset, int max_deliver, int trans_id) {
105
  return sendSignalling(MSG_SIG_ACC, to_id, NULL, cset, max_deliver, trans_id);
106
}
107

    
108

    
109
/// ==================== ///
110
///        RECEIVE       ///
111
/// ==================== ///
112

    
113
void bmap_received(const struct nodeID *fromid, const struct nodeID *ownerid, struct chunkID_set *c_set, int trans_id) {
114
  struct peer *owner = nodeid_to_peer(ownerid,1);
115
  if (owner) {        //now we have it almost sure
116
    chunkID_set_union(owner->bmap,c_set);        //don't send it back
117
    gettimeofday(&owner->bmap_timestamp, NULL);
118
  }
119
}
120

    
121
 /**
122
 * Dispatcher for signaling messages.
123
 *
124
 * This method decodes the signaling messages, retrieving the set of chunk and the signaling
125
 * message, invoking the corresponding method.
126
 *
127
 * @param[in] buff buffer which contains the signaling message
128
 * @param[in] buff_len length of the buffer
129
 * @param[in] msgtype type of message in the buffer
130
 * @param[in] max_deliver deliver at most this number of Chunks
131
 * @param[in] arg parameters associated to the signaling message
132
 * @return 0 on success, <0 on error
133
 */
134

    
135
int sigParseData(const struct nodeID *fromid, uint8_t *buff, int buff_len) {
136
    struct chunkID_set *c_set;
137
    void *meta;
138
    int meta_len;
139
    struct sig_nal *signal;
140
    int sig;
141
    dprintf("\tDecoding signaling message...");
142
    c_set = decodeChunkSignaling(&meta, &meta_len, buff+1, buff_len-1);
143
    dprintf("SIG_HEADER: len: %d, of which meta: %d\n", buff_len, meta_len);
144
    if (!c_set) {
145
      fprintf(stdout, "ERROR decoding signaling message\n");
146
      return -1;
147
    }
148
    dprintf("done\n");
149
    signal = (struct sig_nal *) meta;
150
    sig = (int) (signal->type);
151
    dprintf("\tSignaling Type %d\n", sig);
152
    //MaxDelivery  and Trans_Id to be defined
153
    switch (sig) {
154
        case MSG_SIG_BMOFF:
155
        {
156
          int dummy;
157
          struct nodeID *ownerid = nodeid_undump(&(signal->third_peer),&dummy);
158
          bmap_received(fromid, ownerid, c_set, signal->trans_id);
159
          break;
160
        }
161
        default:
162
          return -1;
163
    }
164
    return 1;
165
}
166

    
167
/// ==================== ///
168
///          INIT        ///
169
/// ==================== ///
170

    
171
int sigInit(struct nodeID *myID, struct peerset *ps)
172
{
173
  localID = myID;
174
  pset = ps;
175
  return 1;
176
}