Statistics
| Branch: | Revision:

napa-baselibs / tests / Broadcaster / som.c @ 507372bb

History | View | Annotate | Download (5.82 KB)

1
/*
2
 * % vim: syntax=c ts=4 tw=76 cindent shiftwidth=4
3
 *
4
 * Simplistic "SOM" module for the Broadcaster demo application
5
 *
6
 */
7

    
8
#include "peer.h"
9

    
10
#include <ml.h>
11
#include <neighborlist.h>
12
#include <chunk.h>
13
#include <chunkbuffer.h>
14
#include <repoclient.h>
15

    
16
/** Configuration options for the NeighborList */
17
cfg_opt_t cfg_nlist[] = {
18
        CFG_INT("refreshPeriod", 30, CFGF_NONE),
19
        CFG_INT("size", 10, CFGF_NONE),
20
        CFG_INT("peerAge", 300, CFGF_NONE),
21
        CFG_END()
22
};
23

    
24
/** Configuration options for this "SOM" */
25
cfg_opt_t cfg_som[] = {
26
        CFG_STR("protocol", "passive", CFGF_NONE),
27
        CFG_STR("channel", NULL, CFGF_NONE),
28
        CFG_INT("publishPeerIDPeriod", 0, CFGF_NONE),
29
        CFG_SEC("neighborlist", cfg_nlist, CFGF_NONE),
30
        CFG_END()
31
};
32

    
33
HANDLE neighborlist = NULL;
34
char *channel = NULL;
35

    
36
NeighborListEntry *neighbors = NULL;
37
int neighbors_size = 0;
38
int num_neighbors = 0;
39

    
40
struct chunk_buffer *chunkbuffer = NULL;
41

    
42
void findServer(int fd, short event, void *arg);
43

    
44
/** Gets called by the ML on the establishment of a connection (regardless
45
 * of who initiated it 
46
 */
47
void connection_cb(int connectionID, void *arg) {
48
        debug("A connection to  %s has been established", arg);
49

    
50
        char remote_socket[SOCKETID_SIZE];
51
        socketID_handle remsocketID = (void *)remote_socket;
52
        mlStringToSocketID((char *)arg, remsocketID);
53

    
54
        activateMeasurements(remsocketID);
55
}
56

    
57
/** NeighborList callback: gets called when the neighborlist changes */
58
void neighborlist_cb(HANDLE nlist, void *arg) {
59
        num_neighbors = neighbors_size;
60
        int error = neighborlist_query(nlist, neighbors, &num_neighbors);
61
        if (error) {
62
                error("neighborlist_query returned error %d", error);
63
                return;
64
        }
65

    
66
        /* Open a connection to all the neighbors */
67
        int i;
68
        socketID_handle remsocketID = malloc(SOCKETID_SIZE);
69
        for (i = 0; i != num_neighbors; i++) {
70
                char *neighbor = strdup(neighbors[i].peer);
71

    
72
                mlStringToSocketID(neighbor, remsocketID);
73

    
74
                if (mlConnectionExist(remsocketID, false) < 0 && 
75
                                strcmp(neighbors[i].peer, LocalPeerID) != 0) {
76
                        debug("Opening connection to %s", neighbors[i].peer);
77
                        mlOpenConnection(remsocketID, connection_cb, neighbors[i].peer,
78
                                        sendParams);
79
                }
80
                free(neighbor);
81
        }
82
        free(remsocketID);
83
}
84

    
85
/** This function gets called periodically and arranges publishing out own
86
 * PeerID to the Repository 
87
 */
88
void static publish_PeerID(HANDLE h, void *arg) {
89
                /* Publish our own PeerID */
90
                MeasurementRecord mr;
91
                memset(&mr, 0, sizeof(mr));
92
                mr.published_name = "PeerID";
93
                mr.channel = channel;
94
                mr.originator = mr.targetA = mr.targetB = LocalPeerID;
95
                publishMeasurementRecord(&mr, "Our own PeerID");
96
}
97

    
98
/** ChunkBuffer notifier: gets called whenever a new Chunk is inserted into
99
 * the buffer 
100
 */
101
void chunkbuffer_notifier(struct chunk_buffer *cb, void* cbarg, const struct
102
                chunk *c) {
103
        /* Send the good stuff to all the neighbors */
104
        int i;
105
        for (i = 0; i != num_neighbors; i++) {
106
                if (strcmp(neighbors[i].peer, LocalPeerID)) {
107
                        debug("Sending chunk %u to %s",  chunkGetId(c), neighbors[i].peer);
108
                        sendChunk(neighbors[i].peer, c);
109
                }
110
        }
111

    
112
        playout_chunk(c);
113
}
114

    
115
/** This function gets called when the "source locator" finishes.
116
  * Each "client" tries to find the PeerID of the source (a Peer with the
117
  * SrcLag measurement 0.0) and open a Connection towards it.
118
  * This function is needed only because a ML deficiency (sometimes a
119
  * Connection can be opened from A to B only after B has initiated to
120
  * connection to A).
121
  */
122
void findServer_cb(HANDLE rep, HANDLE id, void *cbarg, char **result, int
123
                nResults) {
124
        if (nResults != 1) {
125
                /* We couldn't find the server properly, try again in 30 secs. */
126
                struct timeval t = { 30, 0 };
127
                event_base_once(eventbase, -1, EV_TIMEOUT, &findServer, NULL, &t);
128
                return;
129
        }
130

    
131
        info("The server is at %s", result[0]);
132

    
133
        static char server[64];
134
    strcpy(server,result[0]);
135
        socketID_handle remsocketID = malloc(SOCKETID_SIZE);
136

    
137
        mlStringToSocketID(server, remsocketID);
138
        mlOpenConnection(remsocketID, connection_cb, server, sendParams);
139
        free(remsocketID);
140
        free(result[0]);
141
        free(result);
142
}
143

    
144
/** This function is the "source locator", that makes the repository query.
145
  * Each "client" tries to find the PeerID of the source (a Peer with the
146
  * SrcLag measurement 0.0) and open a Connection towards it in the
147
  * callback function.
148
  */
149
void findServer(int fd, short event, void *arg) {
150
        Constraint cons[1];
151

    
152
        cons[0].published_name = "SrcLag";
153
        cons[0].strValue = NULL;
154
        cons[0].minValue = cons[0].maxValue = 0.0;
155
        repGetPeers(repository, findServer_cb, NULL, 1, cons, 1, NULL, 0,
156
                        channel);
157
}
158

    
159
/** Initializer for this "SOM" */
160
void som_init(cfg_t *som_config) {
161
        if (!som_config) return;
162

    
163
        const char *protocol = cfg_getstr(som_config, "protocol");
164
        channel = cfg_getstr(som_config, "channel");
165

    
166
        info("Scheduler is initializing protocol [ \"%s\" ] on channel %s",
167
                        protocol,channel);
168

    
169
        chunkbuffer = chbInit("size=256,time=now");
170
        if (!chunkbuffer) 
171
                fatal("Error initialising the Chunk Buffer");
172
        chbRegisterNotifier(chunkbuffer, chunkbuffer_notifier, NULL);
173

    
174
        int publishPeerIDPeriod =  cfg_getint(som_config,
175
                        "publishPeerIDPeriod");
176
        if (publishPeerIDPeriod) 
177
                napaSchedulePeriodic(NULL, 1.0/(float)publishPeerIDPeriod,
178
                                publish_PeerID, NULL);
179

    
180
        /* If we are passive, we need to figure out who is the server, and send
181
         * a message to it for ML to be able to work... Sigh... */
182
        if (!strcmp(protocol, "passive")) {
183
                struct timeval t = { 0, 0 };
184
                event_base_once(eventbase, -1, EV_TIMEOUT, &findServer, NULL, &t);
185
        }
186

    
187
        /* If the string "neighborlist" is present in the protocol name, we
188
         * launch a neighborlist instance */
189
        if (strstr(protocol, "neighborlist")) {
190
                cfg_t *nlist_cfg = cfg_getsec(som_config, "neighborlist");
191
                neighbors_size = cfg_getint(nlist_cfg, "size");
192

    
193
                neighbors = calloc(sizeof(NeighborListEntry), neighbors_size);
194

    
195
                neighborlist = neighborlist_init(repository, 
196
                        neighbors_size,
197
                        cfg_getint(nlist_cfg, "refreshPeriod"),
198
                        channel,
199
                        neighborlist_cb, NULL);
200
        }
201
}
202

    
203