Statistics
| Branch: | Revision:

napa-baselibs / tests / Broadcaster / ml.c @ 507372bb

History | View | Annotate | Download (5.99 KB)

1
/*
2
 * % vim: syntax=c ts=4 tw=76 cindent shiftwidth=4
3
 *
4
 * Messaging services fror the Peer implementation for the Broadcaster demo application
5
 *
6
 */
7

    
8
#include "peer.h"
9

    
10
#include <ml.h>
11
#include <msg_types.h>
12
#include <chunk.h>
13

    
14
cfg_opt_t cfg_stun[] = {
15
        CFG_STR("server", NULL, CFGF_NONE),
16
        CFG_INT("port", 3478, CFGF_NONE),
17
        CFG_END()
18
};
19

    
20
cfg_opt_t cfg_ml[] = {
21
        CFG_INT("port", 1234, CFGF_NONE),
22
        CFG_INT("timeout", 3, CFGF_NONE),
23
        CFG_STR("address", NULL, CFGF_NONE),
24
        CFG_SEC("stun", cfg_stun, CFGF_NONE),
25
        CFG_INT("keepalive", 0, CFGF_NONE),
26
        CFG_END()
27
};
28

    
29
bool messaging_ready = false;
30
char LocalPeerID[64] = "Uninitialized";
31
send_params sendParams;
32

    
33
void recv_localsocketID_cb(socketID_handle localID, int errorstatus) {
34
        if (errorstatus) fatal("Error %d on initializing Messaging Layer", errorstatus);
35
        mlSocketIDToString(localID, LocalPeerID, sizeof(LocalPeerID));
36
        if (errorstatus == 0) messaging_ready = true;
37
}
38

    
39
/**
40
  Utility function to send a Chunk to a Peer
41

42
  @param[in] peedID send to this peer
43
  @param[in] c the chunk
44
  */
45
void sendChunk(const char *peerID, const struct chunk *c) {
46

    
47
        if (strcmp(peerID, LocalPeerID) == 0) {
48
                warn("Refusing to send a chunk to myself...");
49
                return;
50
        }
51

    
52
        socketID_handle remsocketID = NULL; // the remote
53

    
54
        remsocketID = malloc(SOCKETID_SIZE);
55
        mlStringToSocketID(peerID, remsocketID);
56

    
57
        int connid = mlConnectionExist(remsocketID, true);
58
        free(remsocketID);
59

    
60
        if (connid < 0) return;
61

    
62
        if (c->attributes_size != 0) {//chunk is encoded
63
                debug("Sending chunk %u to %s with attributes",  chunkGetId(c), peerID);
64
            /* code snipplet from the "real ml.c" */
65
                int buff_len;
66
            uint8_t *buff;
67
            buff_len = 20 + c->size + c->attributes_size;
68
            buff = malloc(buff_len + 1);
69
            if (!buff) {
70
                warn("Error: unable to allocate memory %d bytes for sending chunk %d.", (buff_len+1) , c->id);
71
                return;
72
            }
73
            int res = encodeChunk(c, buff + 1, buff_len);
74
            buff[0] = MSG_TYPE_CHUNK;
75
                mlSendData(connid, buff, buff_len + 1,
76
                                MSG_TYPE_CHUNK, NULL);
77
        }
78
        else {//chunk is raw data only
79
                debug("Sending chunk %u to %s",  chunkGetId(c), peerID);
80
                mlSendData(connid, chunkGetData(c, false), chunkGetSize(c),
81
                                MSG_TYPE_CHUNK, NULL);
82
        }
83
}
84

    
85
/**
86
 * Receiver ML_KEEPALIVE callback. 
87
 * These are just ignored now.
88
 * @param *buffer A pointer to the buffer
89
 * @param buflen The length of the buffer
90
 * @param msgtype The message type
91
 * @param *arg An argument that receives metadata about the
92
 * received data
93
 */
94
void recv_keepalive_from_peer_cb(char *buffer,int buflen,unsigned char
95
                msgtype,recv_params *params) {
96

    
97
        char peer[64] = "";
98
        if (mlSocketIDToString(params->remote_socketID, peer, sizeof(peer))) {
99
                fatal("Internal error: Unable to decode sender in "
100
                                "recv_keepalive_from_peer_cb");
101
        }
102

    
103
        if (buflen < 32 && buffer[buflen] == 0) {
104
                debug("Keepalive message received: %s from %s", buffer, peer);
105
        }
106
        else {
107
                warn("Bogus keepalive message received from %s", peer);
108
        }
109
}
110

    
111
/**
112
 * The peer receives data per callback from the messaging layer.
113
 * @param *buffer A pointer to the buffer
114
 * @param buflen The length of the buffer
115
 * @param msgtype The message type
116
 * @param *arg An argument that receives metadata about the
117
 * received data
118
 */
119
void recv_data_from_peer_cb(char *buffer,int buflen,unsigned char
120
                msgtype,recv_params *params) {
121

    
122
        char peer[64] = "";
123
        if (mlSocketIDToString(params->remote_socketID, peer, sizeof(peer))) {
124
                fatal("Internal error: Unable to decode sender in "
125
                                "recv_data_from_peer_cb");
126
        }
127

    
128
        if (strcmp(peer, LocalPeerID) == 0) {
129
                warn("Received a chunk from myself - this should not have happened");
130
                return;
131
        }
132
        static int chunkid = 0;
133
        debug("CLIENT_MODE: Received data from %s: msgtype %d buflen %d (fragments: %d)", peer, msgtype, buflen, params->recvFragments);
134

    
135
        if (params->nrMissingBytes == 0) {
136
                if (buffer[0] == MSG_TYPE_CHUNK) {//chunk is encoded
137
                        struct chunk c;
138
                        int res = decodeChunk(&c, buffer + 1, buflen);
139
                        if (res > 0) {
140
                                //CRC checking...
141
                                int packets = (c.attributes_size - 12) / ATTRIBUTES_HEADER_SIZE;
142

    
143
                                char *rcvd_crc = malloc(11);
144
                                memcpy(rcvd_crc, c.attributes + (packets * ATTRIBUTES_HEADER_SIZE), 10 );
145
                                rcvd_crc[10] = 0;
146

    
147
                                crc datacrc;
148
                                datacrc = crcFast(c.data, c.size);
149
                                char gen_crc[11];
150
                                sprintf(gen_crc, "%10X", (unsigned int)datacrc);
151

    
152
                                if (!strcmp(rcvd_crc,gen_crc)) {//CRC check passed
153
                                        int res;
154
                                        res = chbAddChunk(chunkbuffer, &c);
155
                                        if (res == -2) warn("Chunk ERROR - Received chunk(%d) duplicate", c.id);
156
                                }
157
                                else {//CRC check failed
158
                                        //... retransmission logic may be inserted here later
159
                                        warn("Chunk ERROR - CRC check failed for chunk(%d)", c.id);
160
                                }
161
                        }
162
                }
163
                else {//only the data was received
164
                        struct chunk *c;
165
                        c = chbCreateChunk(true, buffer, buflen, chunkid,
166
                                        40*chunkid);
167
                        if (c) chbAddChunk(chunkbuffer, c);
168
                }
169
                chunkid++;
170
        }
171
        else {
172
                warn("Chunk ERROR - Received message is missing %d bytes, chunk dropped (chunk duration is too high on source side)",
173
                                params->nrMissingBytes);
174
        }
175
}
176

    
177
void ml_init(cfg_t *ml_config, int override_port) {
178
        if (!ml_config) return;
179
        struct timeval timeout = { 0,0 };
180
        timeout.tv_sec = cfg_getint(ml_config, "timeout");
181

    
182
        cfg_t *stun = cfg_getsec(ml_config, "stun");
183
        char *stun_server = cfg_getstr(stun, "server");
184
        int stun_port = cfg_getint(stun, "port");
185

    
186
        char *address = cfg_getstr(ml_config, "address");
187
        if (!address) address = strdup(mlAutodetectIPAddress());
188
        int port = cfg_getint(ml_config, "port");
189
        if (override_port) port = override_port;
190

    
191
        sendParams.keepalive = cfg_getint(ml_config, "keepalive");
192

    
193
        crcInit();
194
        debug("Calling init_messaging_layer with: %d, %u.%u, (%s, %d), stun(%s,%d)", 
195
                1, timeout.tv_sec, timeout.tv_usec, address, port, 
196
                stun_server, stun_port);
197
        mlInit(1, timeout, port, address, stun_port, stun_server, 
198
                recv_localsocketID_cb, eventbase);
199

    
200
        while (!messaging_ready) {
201
                napaYield();
202
        }
203
        mlRegisterRecvDataCb(recv_data_from_peer_cb, MSG_TYPE_CHUNK);
204
        mlRegisterRecvDataCb(recv_keepalive_from_peer_cb, MSG_TYPE_ML_KEEPALIVE);
205

    
206
        info("ML has been initialized, local addess is %s", LocalPeerID);
207
}
208

    
209