napa-baselibs / tests / Broadcaster / som.c @ 507372bb
History | View | Annotate | Download (5.82 KB)
1 |
/*
|
---|---|
2 |
* % vim: syntax=c ts=4 tw=76 cindent shiftwidth=4
|
3 |
*
|
4 |
* Simplistic "SOM" module for the Broadcaster demo application
|
5 |
*
|
6 |
*/
|
7 |
|
8 |
#include "peer.h" |
9 |
|
10 |
#include <ml.h> |
11 |
#include <neighborlist.h> |
12 |
#include <chunk.h> |
13 |
#include <chunkbuffer.h> |
14 |
#include <repoclient.h> |
15 |
|
16 |
/** Configuration options for the NeighborList */
|
17 |
cfg_opt_t cfg_nlist[] = { |
18 |
CFG_INT("refreshPeriod", 30, CFGF_NONE), |
19 |
CFG_INT("size", 10, CFGF_NONE), |
20 |
CFG_INT("peerAge", 300, CFGF_NONE), |
21 |
CFG_END() |
22 |
}; |
23 |
|
24 |
/** Configuration options for this "SOM" */
|
25 |
cfg_opt_t cfg_som[] = { |
26 |
CFG_STR("protocol", "passive", CFGF_NONE), |
27 |
CFG_STR("channel", NULL, CFGF_NONE), |
28 |
CFG_INT("publishPeerIDPeriod", 0, CFGF_NONE), |
29 |
CFG_SEC("neighborlist", cfg_nlist, CFGF_NONE),
|
30 |
CFG_END() |
31 |
}; |
32 |
|
33 |
HANDLE neighborlist = NULL;
|
34 |
char *channel = NULL; |
35 |
|
36 |
NeighborListEntry *neighbors = NULL;
|
37 |
int neighbors_size = 0; |
38 |
int num_neighbors = 0; |
39 |
|
40 |
struct chunk_buffer *chunkbuffer = NULL; |
41 |
|
42 |
void findServer(int fd, short event, void *arg); |
43 |
|
44 |
/** Gets called by the ML on the establishment of a connection (regardless
|
45 |
* of who initiated it
|
46 |
*/
|
47 |
void connection_cb(int connectionID, void *arg) { |
48 |
debug("A connection to %s has been established", arg);
|
49 |
|
50 |
char remote_socket[SOCKETID_SIZE];
|
51 |
socketID_handle remsocketID = (void *)remote_socket;
|
52 |
mlStringToSocketID((char *)arg, remsocketID);
|
53 |
|
54 |
activateMeasurements(remsocketID); |
55 |
} |
56 |
|
57 |
/** NeighborList callback: gets called when the neighborlist changes */
|
58 |
void neighborlist_cb(HANDLE nlist, void *arg) { |
59 |
num_neighbors = neighbors_size; |
60 |
int error = neighborlist_query(nlist, neighbors, &num_neighbors);
|
61 |
if (error) {
|
62 |
error("neighborlist_query returned error %d", error);
|
63 |
return;
|
64 |
} |
65 |
|
66 |
/* Open a connection to all the neighbors */
|
67 |
int i;
|
68 |
socketID_handle remsocketID = malloc(SOCKETID_SIZE); |
69 |
for (i = 0; i != num_neighbors; i++) { |
70 |
char *neighbor = strdup(neighbors[i].peer);
|
71 |
|
72 |
mlStringToSocketID(neighbor, remsocketID); |
73 |
|
74 |
if (mlConnectionExist(remsocketID, false) < 0 && |
75 |
strcmp(neighbors[i].peer, LocalPeerID) != 0) {
|
76 |
debug("Opening connection to %s", neighbors[i].peer);
|
77 |
mlOpenConnection(remsocketID, connection_cb, neighbors[i].peer, |
78 |
sendParams); |
79 |
} |
80 |
free(neighbor); |
81 |
} |
82 |
free(remsocketID); |
83 |
} |
84 |
|
85 |
/** This function gets called periodically and arranges publishing out own
|
86 |
* PeerID to the Repository
|
87 |
*/
|
88 |
void static publish_PeerID(HANDLE h, void *arg) { |
89 |
/* Publish our own PeerID */
|
90 |
MeasurementRecord mr; |
91 |
memset(&mr, 0, sizeof(mr)); |
92 |
mr.published_name = "PeerID";
|
93 |
mr.channel = channel; |
94 |
mr.originator = mr.targetA = mr.targetB = LocalPeerID; |
95 |
publishMeasurementRecord(&mr, "Our own PeerID");
|
96 |
} |
97 |
|
98 |
/** ChunkBuffer notifier: gets called whenever a new Chunk is inserted into
|
99 |
* the buffer
|
100 |
*/
|
101 |
void chunkbuffer_notifier(struct chunk_buffer *cb, void* cbarg, const struct |
102 |
chunk *c) { |
103 |
/* Send the good stuff to all the neighbors */
|
104 |
int i;
|
105 |
for (i = 0; i != num_neighbors; i++) { |
106 |
if (strcmp(neighbors[i].peer, LocalPeerID)) {
|
107 |
debug("Sending chunk %u to %s", chunkGetId(c), neighbors[i].peer);
|
108 |
sendChunk(neighbors[i].peer, c); |
109 |
} |
110 |
} |
111 |
|
112 |
playout_chunk(c); |
113 |
} |
114 |
|
115 |
/** This function gets called when the "source locator" finishes.
|
116 |
* Each "client" tries to find the PeerID of the source (a Peer with the
|
117 |
* SrcLag measurement 0.0) and open a Connection towards it.
|
118 |
* This function is needed only because a ML deficiency (sometimes a
|
119 |
* Connection can be opened from A to B only after B has initiated to
|
120 |
* connection to A).
|
121 |
*/
|
122 |
void findServer_cb(HANDLE rep, HANDLE id, void *cbarg, char **result, int |
123 |
nResults) { |
124 |
if (nResults != 1) { |
125 |
/* We couldn't find the server properly, try again in 30 secs. */
|
126 |
struct timeval t = { 30, 0 }; |
127 |
event_base_once(eventbase, -1, EV_TIMEOUT, &findServer, NULL, &t); |
128 |
return;
|
129 |
} |
130 |
|
131 |
info("The server is at %s", result[0]); |
132 |
|
133 |
static char server[64]; |
134 |
strcpy(server,result[0]);
|
135 |
socketID_handle remsocketID = malloc(SOCKETID_SIZE); |
136 |
|
137 |
mlStringToSocketID(server, remsocketID); |
138 |
mlOpenConnection(remsocketID, connection_cb, server, sendParams); |
139 |
free(remsocketID); |
140 |
free(result[0]);
|
141 |
free(result); |
142 |
} |
143 |
|
144 |
/** This function is the "source locator", that makes the repository query.
|
145 |
* Each "client" tries to find the PeerID of the source (a Peer with the
|
146 |
* SrcLag measurement 0.0) and open a Connection towards it in the
|
147 |
* callback function.
|
148 |
*/
|
149 |
void findServer(int fd, short event, void *arg) { |
150 |
Constraint cons[1];
|
151 |
|
152 |
cons[0].published_name = "SrcLag"; |
153 |
cons[0].strValue = NULL; |
154 |
cons[0].minValue = cons[0].maxValue = 0.0; |
155 |
repGetPeers(repository, findServer_cb, NULL, 1, cons, 1, NULL, 0, |
156 |
channel); |
157 |
} |
158 |
|
159 |
/** Initializer for this "SOM" */
|
160 |
void som_init(cfg_t *som_config) {
|
161 |
if (!som_config) return; |
162 |
|
163 |
const char *protocol = cfg_getstr(som_config, "protocol"); |
164 |
channel = cfg_getstr(som_config, "channel");
|
165 |
|
166 |
info("Scheduler is initializing protocol [ \"%s\" ] on channel %s",
|
167 |
protocol,channel); |
168 |
|
169 |
chunkbuffer = chbInit("size=256,time=now");
|
170 |
if (!chunkbuffer)
|
171 |
fatal("Error initialising the Chunk Buffer");
|
172 |
chbRegisterNotifier(chunkbuffer, chunkbuffer_notifier, NULL);
|
173 |
|
174 |
int publishPeerIDPeriod = cfg_getint(som_config,
|
175 |
"publishPeerIDPeriod");
|
176 |
if (publishPeerIDPeriod)
|
177 |
napaSchedulePeriodic(NULL, 1.0/(float)publishPeerIDPeriod, |
178 |
publish_PeerID, NULL);
|
179 |
|
180 |
/* If we are passive, we need to figure out who is the server, and send
|
181 |
* a message to it for ML to be able to work... Sigh... */
|
182 |
if (!strcmp(protocol, "passive")) { |
183 |
struct timeval t = { 0, 0 }; |
184 |
event_base_once(eventbase, -1, EV_TIMEOUT, &findServer, NULL, &t); |
185 |
} |
186 |
|
187 |
/* If the string "neighborlist" is present in the protocol name, we
|
188 |
* launch a neighborlist instance */
|
189 |
if (strstr(protocol, "neighborlist")) { |
190 |
cfg_t *nlist_cfg = cfg_getsec(som_config, "neighborlist");
|
191 |
neighbors_size = cfg_getint(nlist_cfg, "size");
|
192 |
|
193 |
neighbors = calloc(sizeof(NeighborListEntry), neighbors_size);
|
194 |
|
195 |
neighborlist = neighborlist_init(repository, |
196 |
neighbors_size, |
197 |
cfg_getint(nlist_cfg, "refreshPeriod"),
|
198 |
channel, |
199 |
neighborlist_cb, NULL);
|
200 |
} |
201 |
} |
202 |
|
203 |
|