Statistics
| Branch: | Revision:

napa-baselibs / tests / peer / init.c @ 507372bb

History | View | Annotate | Download (4.34 KB)

1
/*
2
 * Initialization and configuration processing for the peer
3
 */
4

    
5
#include        "peer.h"
6

    
7
char LocalPeerID[64] = "Uninitialized";
8

    
9
void init_messaging(cfg_t *ml_config);
10
void init_monitoring(cfg_t *mon_config);
11
void init_repoclient(cfg_t *repo_config);
12
void init_source(cfg_t *source_config);
13

    
14
void setup_http_injector(const char *url);
15
void setup_http_ejector(const char *url,int delay);
16

    
17
cfg_opt_t cfg_stun[] = {
18
        CFG_STR("server", "stun.ekiga.net", CFGF_NONE),
19
        CFG_INT("port", 3478, CFGF_NONE),
20
        CFG_END()
21
};
22

    
23
cfg_opt_t cfg_ml[] = {
24
        CFG_INT("port", 1234, CFGF_NONE),
25
        CFG_INT("timeout", 3, CFGF_NONE),
26
        CFG_STR("address", NULL, CFGF_NONE),
27
        CFG_SEC("stun", cfg_stun, CFGF_NONE),
28
        CFG_END()
29
};
30

    
31
cfg_opt_t cfg_rep[] = {
32
        CFG_STR("server", NULL, CFGF_NONE),
33
        CFG_END()
34
};
35

    
36
cfg_opt_t cfg_source[] = {
37
        CFG_STR("stream", NULL, CFGF_NONE),
38
        CFG_FLOAT("chunk_duration", 1.0, CFGF_NONE),
39
        CFG_END()
40
};
41

    
42
cfg_opt_t cfg_main[] = {
43
        CFG_SEC("network", cfg_ml, CFGF_NONE),
44
        CFG_SEC("repository", cfg_rep, CFGF_NONE),
45
        CFG_SEC("source", cfg_source, CFGF_NONE),
46
        CFG_END()
47
};
48

    
49
int messaging_ready = 0;
50

    
51
void new_chunk(struct chunk_buffer *cb, void *cbarg, const struct chunk *c) {
52
        info("New chunk in buffer");
53
}
54

    
55

    
56
Peer *peer_init(const char *configfile) {
57
        peer = calloc(sizeof(Peer),1);
58

    
59
        napaInit(event_base_new());
60

    
61
        cfg_t *main_config = cfg_init(cfg_main, CFGF_NONE);
62
        if(cfg_parse(main_config, configfile) == CFG_PARSE_ERROR)
63
                fatal("Unable to parse config file %s", configfile);
64

    
65
        cfg_t *repoclient_cfg = cfg_getsec(main_config, "repository");
66
        if (repoclient_cfg) init_repoclient(repoclient_cfg);
67

    
68
        cfg_t *network_cfg = cfg_getsec(main_config, "network");
69
        init_messaging(network_cfg);
70

    
71
        init_monitoring(NULL);
72

    
73
        cfg_t *source_cfg = cfg_getsec(main_config, "source");
74
        if (source_cfg) init_source(source_cfg);
75

    
76
        /* cfg_free(main_config); */
77
        return peer;
78
}
79

    
80
void recv_localsocketID_cb(socketID_handle localID, int errorstatus) {
81
        if (errorstatus) fatal("Error %d on initializing Messaging Layer", errorstatus);
82
        mlSocketIDToString(localID, LocalPeerID, sizeof(LocalPeerID));
83
        if (errorstatus == 0) messaging_ready = 1;
84
}
85

    
86
void init_messaging(cfg_t *ml_config) {
87
        if (!ml_config) return;
88
        struct timeval timeout = { 0,0 };
89
        timeout.tv_sec = cfg_getint(ml_config, "timeout");
90

    
91
        cfg_t *stun = cfg_getsec(ml_config, "stun");
92
        char *stun_server = cfg_getstr(stun, "server");
93
        int stun_port = cfg_getint(stun, "port");
94

    
95
        char *address = cfg_getstr(ml_config, "address");
96
        if (!address) address = strdup(mlAutodetectIPAddress());
97
        int port = cfg_getint(ml_config, "port");
98

    
99
        debug("Calling init_messaging_layer with: %d, %u.%u, (%s, %d), stun(%s,%d)", 
100
                1, timeout.tv_sec, timeout.tv_usec, address, port, 
101
                stun_server, stun_port);
102
        mlInit(1, timeout, port, address, stun_port, stun_server, 
103
                recv_localsocketID_cb, eventbase);
104

    
105
        while (!messaging_ready) {
106
                napaYield();
107
        }
108
        info("ML has been initialized, local addess is %s", LocalPeerID);
109
}
110

    
111
void init_monitoring(cfg_t *mon_config) { 
112
#if 0
113
        if (monInit()) fatal("Unable to init monitoring layer");
114
#endif
115
}
116

    
117
void init_repoclient(cfg_t *repo_config) { 
118
        repInit("");
119
        peer->repository = repOpen(cfg_getstr(repo_config, "server"),0);
120
        if (peer->repository == NULL) fatal("Unable to initialize repoclient");
121
}
122

    
123

    
124
void init_source(cfg_t *source_cfg) {
125

    
126
        char *stream = cfg_getstr(source_cfg, "stream");
127
        if (!stream) return;
128
        double chunk_duration = cfg_getfloat(source_cfg,"chunk_duration");
129

    
130
        info("Initializing SOURCE mode: tuning to stream %s, sampling rate is %lf Hz", 
131
                stream, (double)1.0/chunk_duration);
132
         init_source_ul(stream, chunk_duration) ;
133

    
134
#if 0
135
        mlRegisterRecvConnectionCb(&receive_conn_cb);
136
        mlRegisterErrorConnectionCb(&conn_fail_cb);
137
#endif
138
}
139

    
140
void publish_callback(HANDLE client, HANDLE id, void *cbarg, int result) {
141
        info("PeerID publised");
142
        if (result) fatal("Unable to publish PeerID");
143
}
144

    
145
void client_init(cfg_t *client_config) {
146
#if 0
147
        info("Initializing CLIENT mode: publishing local PeerID");
148
        MeasurementRecord mr;
149
        memset(&mr, 0, sizeof(mr));
150
        mr.published_name = "PeerID";
151
        mr.originator = mr.targetA = mr.targetB = LocalPeerID;
152
        repPublish(peer->repository, publish_callback, NULL, &mr);
153

154
        info("Initializing CLIENT mode: starting http ejector at %s", peer->ejector_url);
155
        setup_http_ejector(peer->ejector_url,0);
156

157
        mlRegisterRecvDataCb(&recv_data_from_peer_cb,12);
158
        mlRegisterRecvDataCb(&recv_chunk_from_peer_cb,15);
159
#endif
160
}
161