napa-baselibs / rep / repoclient.c @ 5f3adef4
History | View | Annotate | Download (6.47 KB)
1 |
#define LOG_MODULE "[rep] " |
---|---|
2 |
#include "repoclient_impl.h" |
3 |
|
4 |
/** Parse a server specification for repOpen */
|
5 |
int parse_serverspec(const char *server, char **addr, unsigned short *port); |
6 |
/** Batch publish callback */
|
7 |
void deferred_publish_cb(evutil_socket_t fd, short what, void *arg); |
8 |
|
9 |
void repInit(const char *config) { |
10 |
if ((publish_streambuffer.stream = open_memstream(&publish_streambuffer.buffer, &publish_streambuffer.len))
|
11 |
== NULL) fatal("Unable to initilize repoclient, file %s, line %d", __FILE__, __LINE__); |
12 |
debug("The Repository client library is ready");
|
13 |
} |
14 |
|
15 |
/** Initialize the repoclient instance by parsing the server spec string and setting up things */
|
16 |
HANDLE repOpen(const char *server, int publish_delay) { |
17 |
struct reposerver *rep = malloc(sizeof(struct reposerver)); |
18 |
if (!rep) fatal("Out of memory while initializing repository client for %s", server); |
19 |
rep->magic=REPOSERVER_MAGIC; |
20 |
rep->publish_buffer = NULL;
|
21 |
rep->publish_buffer_entries = 0;
|
22 |
rep->publish_delay = publish_delay; |
23 |
rep->in_transit = NULL;
|
24 |
rep->in_transit_entries = 0;
|
25 |
parse_serverspec(server, &(rep->address), &(rep->port)); |
26 |
|
27 |
info("Opening repository client %p to http://%s:%d", rep, rep->address, rep->port);
|
28 |
|
29 |
if ((rep->evhttp_conn = evhttp_connection_base_new(eventbase, rep->address, rep->port)) == NULL) |
30 |
fatal("Unable to establish connection to %s:%d", rep->address, rep->port);
|
31 |
|
32 |
if (publish_delay) {
|
33 |
rep->publish_buffer = calloc(sizeof(struct deferred_publish), PUBLISH_BUFFER_SIZE); |
34 |
rep->publish_buffer_entries = 0;
|
35 |
/* Schedule the batch publisher */
|
36 |
struct timeval t = { publish_delay, 0 }; |
37 |
event_base_once(eventbase, -1, EV_TIMEOUT, deferred_publish_cb, rep, &t);
|
38 |
} |
39 |
|
40 |
return (HANDLE *)rep;
|
41 |
} |
42 |
|
43 |
/** Close the repoclient instance and free resources */
|
44 |
void repClose(HANDLE h) {
|
45 |
if (!check_handle(h, __FUNCTION__)) return; |
46 |
struct reposerver *rep = (struct reposerver *)h; |
47 |
|
48 |
debug("Closing repository client %p to %s:%hu", h, rep->address, rep->port);
|
49 |
evhttp_connection_free(rep->evhttp_conn); |
50 |
rep->magic=0;
|
51 |
if (rep->publish_buffer_entries && rep->publish_buffer) {
|
52 |
while (rep->publish_buffer_entries) {
|
53 |
free_measurementrecord(&(rep->publish_buffer[--rep->publish_buffer_entries].r)); |
54 |
} |
55 |
} |
56 |
if (rep->publish_buffer) free(rep->publish_buffer);
|
57 |
if (rep->in_transit_entries && rep->in_transit) {
|
58 |
while (rep->in_transit_entries) {
|
59 |
free_measurementrecord(&(rep->in_transit[--rep->in_transit_entries].r)); |
60 |
} |
61 |
} |
62 |
if (rep->in_transit) free(rep->in_transit);
|
63 |
free(rep); |
64 |
} |
65 |
|
66 |
HANDLE repListMeasurementNames(HANDLE rep, cb_repListMeasurementNames cb, void *cbarg, int maxResults, const char *ch) { |
67 |
static int id = 1; |
68 |
if (!check_handle(rep, __FUNCTION__)) return NULL; |
69 |
debug("About to call listMeasurementIds with maxResults %d", maxResults);
|
70 |
|
71 |
char uri[1024]; |
72 |
request_data *rd = (request_data *)malloc(sizeof(request_data));
|
73 |
if (!rd) return NULL; |
74 |
rd->id = (void *)rd;
|
75 |
rd->server = (struct reposerver *)rep;
|
76 |
rd->cb = cb; |
77 |
rd->data = maxResults; |
78 |
|
79 |
sprintf(uri, "/ListMeasurementNames?maxResults=%d", maxResults);
|
80 |
make_request(uri, _stringlist_callback, (void *)rd);
|
81 |
return (HANDLE)(rd);
|
82 |
} |
83 |
|
84 |
void make_request(const char *uri, void (*callback)(struct evhttp_request *, void *), void *cb_arg) { |
85 |
struct evhttp_request *req = evhttp_request_new(callback, cb_arg);
|
86 |
request_data *rd = (request_data *)cb_arg; |
87 |
|
88 |
if (!req) {
|
89 |
error("Failed to create request object");
|
90 |
return;
|
91 |
} |
92 |
evhttp_add_header(req->output_headers, "Connection", "close"); |
93 |
|
94 |
if (evhttp_make_request(rd->server->evhttp_conn, req, EVHTTP_REQ_GET, uri)) {
|
95 |
warn("evhttp_make_request failed");
|
96 |
} |
97 |
} |
98 |
|
99 |
int make_post_request(const char *uri, const char *data, void (*callback)(struct evhttp_request *, void *), void *cb_arg) { |
100 |
struct evhttp_request *req = evhttp_request_new(callback, cb_arg);
|
101 |
struct reposerver *rep = (struct reposerver *)cb_arg; |
102 |
|
103 |
if (!req) {
|
104 |
error("Failed to create request object");
|
105 |
return -1; |
106 |
} |
107 |
|
108 |
if (evhttp_add_header(req->output_headers, "Connection", "close") < 0) { |
109 |
error("Failed to add header");
|
110 |
return -2; |
111 |
} |
112 |
|
113 |
if (evbuffer_add(req->output_buffer, data, strlen(data) + 1) < 0) { |
114 |
error("Failed to add data to request");
|
115 |
return -3; |
116 |
} |
117 |
|
118 |
if (evhttp_make_request(rep->evhttp_conn, req, EVHTTP_REQ_POST, uri) < 0) { |
119 |
warn("evhttp_make_request failed");
|
120 |
return -4; |
121 |
} |
122 |
return 0; |
123 |
} |
124 |
|
125 |
/** Parse a server specification of the form addr:port */
|
126 |
int parse_serverspec(const char *rep, char **addr, unsigned short *port) { |
127 |
int p = 0; |
128 |
if (!rep) fatal("NULL repository is provided to repoclient, unable to continue"); |
129 |
|
130 |
/* Find the address part first */
|
131 |
while (rep[p] != ':' && rep[p] != 0) p++; |
132 |
*addr = strdup(rep); |
133 |
(*addr)[p] = 0;
|
134 |
/* Parse the port spec, 80 if not specified */
|
135 |
*port = 80;
|
136 |
if (rep[p] != 0 && sscanf(rep + p + 1, "%hu", port) != 1) { |
137 |
fatal("Unable to parse repository server specification \"%s\"", rep);
|
138 |
} |
139 |
return 0; |
140 |
} |
141 |
|
142 |
#if 0
|
143 |
|
144 |
void _countPeers_callback(struct evhttp_request *req,void *arg) {
|
145 |
if (req == NULL || arg == NULL) return;
|
146 |
request_data *cbdata = (request_data *)arg;
|
147 |
|
148 |
void (*user_cb)(HANDLE rep, HANDLE id, int result) = cbdata->cb;
|
149 |
HANDLE id = cbdata->id;
|
150 |
free(cbdata);
|
151 |
|
152 |
int result = -1;
|
153 |
if (req->response_code != HTTP_OK) {
|
154 |
warn("Failed repository operation (id %p, error is %d %s): %s",
|
155 |
id, req->response_code, req->response_code_line, req->uri);
|
156 |
size_t response_len = evbuffer_get_length(req->input_buffer);
|
157 |
if (response_len) {
|
158 |
char *response = malloc(response_len + 1);
|
159 |
evbuffer_remove(req->input_buffer, response, response_len);
|
160 |
response[response_len] = 0;
|
161 |
debug("Response string (len %d): %s", response_len, response);
|
162 |
free(response);
|
163 |
}
|
164 |
if (user_cb) user_cb(client, id, result);
|
165 |
return;
|
166 |
}
|
167 |
|
168 |
char *line = evbuffer_readln(req->input_buffer, NULL, EVBUFFER_EOL_ANY);
|
169 |
if (line) sscanf(line, "%d", &result);
|
170 |
if (user_cb && result >= 0) user_cb(client, id, result);
|
171 |
}
|
172 |
|
173 |
HANDLE countPeers(HANDLE rep, cb_countPeers cb, Constraint *cons, int clen) {
|
174 |
|
175 |
static int id = 1;
|
176 |
if (!check_handle(rep, __FUNCTION__)) return NULL;
|
177 |
debug("About to call countPeers with constaints %s", constraints2str(cons, clen, 0));
|
178 |
|
179 |
char uri[10240];
|
180 |
request_data *rd = (request_data *)malloc(sizeof(request_data));
|
181 |
if (!rd) return NULL;
|
182 |
rd->id = (void *)id;
|
183 |
rd->cb = cb;
|
184 |
|
185 |
sprintf(uri, "/CountPeers?%s", constraints2str(cons, clen, 0));
|
186 |
debug("Making countPeers request with URI %s", uri);
|
187 |
make_request(uri, _stringlist_callback, (void *)rd);
|
188 |
return (HANDLE)(id++);
|
189 |
}
|
190 |
|
191 |
#endif
|
192 |
|