napa-baselibs / rep / publish.c @ b58c58c3
History | View | Annotate | Download (6.7 KB)
1 |
/*
|
---|---|
2 |
* % vim: syntax=c ts=4 tw=76 cindent shiftwidth=4
|
3 |
*
|
4 |
*/
|
5 |
|
6 |
#define LOG_MODULE "[rep] " |
7 |
#include "repoclient_impl.h" |
8 |
|
9 |
struct streambuffer publish_streambuffer = { NULL, NULL, 0}; |
10 |
|
11 |
void bprintf(struct streambuffer *sb, const char *txt, ...) { |
12 |
va_list str_args; |
13 |
va_start(str_args, txt); |
14 |
#if !_WIN32 && !MAC_OS
|
15 |
vfprintf(sb->stream, txt, str_args); |
16 |
#else
|
17 |
int space = sb->buffsize - sb->len;
|
18 |
while(sb->buffer) {
|
19 |
int written = vsnprintf(sb->buffer+sb->len, space, txt, str_args);
|
20 |
char *newbuf;
|
21 |
if(written < space -1) { |
22 |
sb->len += written; |
23 |
break;
|
24 |
} |
25 |
newbuf = (char *)realloc(sb->buffer, sb->buffsize +
|
26 |
SB_INCREMENT); |
27 |
if(newbuf == 0) { |
28 |
fprintf(stderr, "Out of memory on memstream buffer extend (to %d bytes)",
|
29 |
sb->buffsize + SB_INCREMENT); |
30 |
return;
|
31 |
} |
32 |
sb->buffer = newbuf; |
33 |
sb->buffsize += SB_INCREMENT; |
34 |
} |
35 |
#endif
|
36 |
va_end(str_args); |
37 |
} |
38 |
|
39 |
void brewind(struct streambuffer *sb) { |
40 |
#if !_WIN32 && !MAC_OS
|
41 |
rewind(sb->stream); |
42 |
#else
|
43 |
if(sb->buffer != NULL) *sb->buffer = 0; |
44 |
sb->len = 0;
|
45 |
#endif
|
46 |
} |
47 |
|
48 |
void bflush(struct streambuffer *sb) { |
49 |
#if !_WIN32 && !MAC_OS
|
50 |
fflush(sb->stream); |
51 |
#endif
|
52 |
} |
53 |
|
54 |
|
55 |
|
56 |
const char *encode_measurementrecord(const MeasurementRecord *r) { |
57 |
brewind(&publish_streambuffer); |
58 |
|
59 |
bprintf(&publish_streambuffer, "originator=%s&", r->originator);
|
60 |
if (r->targetA)
|
61 |
bprintf(&publish_streambuffer, "targetA=%s&", r->targetA);
|
62 |
if (r->targetB)
|
63 |
bprintf(&publish_streambuffer, "targetB=%s&", r->targetB);
|
64 |
bprintf(&publish_streambuffer, "published_name=%s", r->published_name);
|
65 |
|
66 |
if (r->string_value)
|
67 |
bprintf(&publish_streambuffer, "&string_value=%s", r->string_value);
|
68 |
else if (r->value != (0.0/0.0)) |
69 |
bprintf(&publish_streambuffer, "&value=%f", r->value);
|
70 |
|
71 |
if (r->channel)
|
72 |
bprintf(&publish_streambuffer, "&channel=%s", r->channel);
|
73 |
|
74 |
if (r->timestamp.tv_sec + r->timestamp.tv_usec != 0) { |
75 |
bprintf(&publish_streambuffer, "×tamp=%s",
|
76 |
timeval2str(&(r->timestamp))); |
77 |
} |
78 |
|
79 |
bflush(&publish_streambuffer); |
80 |
/*debug("result: %s", publish_streambuffer.buffer);*/
|
81 |
return publish_streambuffer.buffer;
|
82 |
} |
83 |
|
84 |
void _publish_callback(struct evhttp_request *req,void *arg) { |
85 |
if (req == NULL || arg == NULL) return; |
86 |
request_data *cbdata = (request_data *)arg; |
87 |
struct reposerver *server = cbdata->server;
|
88 |
cb_repPublish user_cb = cbdata->cb; |
89 |
HANDLE id = cbdata->id; |
90 |
free(cbdata); |
91 |
|
92 |
if (req->response_code != HTTP_OK) {
|
93 |
warn("Failed PUBLISH operation (server %s:%hu,id %p, error is %d %s): %s",
|
94 |
server->address, server->port, id, req->response_code, req->response_code_line, req->uri); |
95 |
size_t response_len = evbuffer_get_length(req->input_buffer); |
96 |
if (response_len) {
|
97 |
char *response = malloc(response_len + 1); |
98 |
evbuffer_remove(req->input_buffer, response, response_len); |
99 |
response[response_len] = 0;
|
100 |
debug("Response string (len %d): %s", response_len, response);
|
101 |
free(response); |
102 |
} |
103 |
if (user_cb) user_cb((HANDLE)server, id, cbdata->cbarg, req->response_code ? req->response_code : -1); |
104 |
return;
|
105 |
} |
106 |
if (user_cb) user_cb((HANDLE)server, id, cbdata->cbarg,0); |
107 |
} |
108 |
|
109 |
HANDLE repPublish(HANDLE h, cb_repPublish cb, void *cbarg, MeasurementRecord *r) {
|
110 |
static int id = 1; |
111 |
if (!check_handle(h, __FUNCTION__)) return NULL; |
112 |
struct reposerver *rep = (struct reposerver *)h; |
113 |
if (!r) {
|
114 |
warn("Attempting to publish a NULL MeasurementRecord!");
|
115 |
return 0; |
116 |
} |
117 |
if (isnan(r->value) && r->string_value == NULL) { |
118 |
warn("Attempting to publish an EMPTY record");
|
119 |
return 0; |
120 |
} |
121 |
|
122 |
//info("abouttopublish,%s,%s,%s,%s,%f,%s,%s,%s\n",
|
123 |
// r->originator, r->targetA, r->targetB, r->published_name, r->value,
|
124 |
// r->string_value, r->channel, timeval2str(&(r->timestamp)));
|
125 |
char uri[1024]; |
126 |
char buf[1024]; |
127 |
|
128 |
request_data *rd = (request_data *)malloc(sizeof(request_data));
|
129 |
if (!rd) return NULL; |
130 |
rd->id = (void *)rd;
|
131 |
rd->cb = cb; |
132 |
rd->cbarg = cbarg; |
133 |
rd->server = (struct reposerver *)rep;
|
134 |
|
135 |
if (rep->publish_delay) {
|
136 |
if (rep->publish_buffer_entries >= PUBLISH_BUFFER_SIZE) {
|
137 |
warn("Publish buffer overflow!");
|
138 |
return;
|
139 |
} |
140 |
int p = rep->publish_buffer_entries++;
|
141 |
copy_measurementrecord(&(rep->publish_buffer[p].r), r); |
142 |
rep->publish_buffer[p].requestdata = rd; |
143 |
} |
144 |
else {
|
145 |
sprintf(uri, "/Publish?%s", encode_measurementrecord(r));
|
146 |
make_request(uri, _publish_callback, (void *)rd);
|
147 |
} |
148 |
return (HANDLE)(rd);
|
149 |
} |
150 |
|
151 |
/** libevent callback for deferred publishing */
|
152 |
void _batch_publish_callback(struct evhttp_request *req,void *arg) { |
153 |
if (req == NULL || arg == NULL) return; |
154 |
struct reposerver *rep = (struct reposerver *)arg; |
155 |
int response = req->response_code;
|
156 |
|
157 |
if (response != HTTP_OK) {
|
158 |
warn("Failed BATCH PUBLISH operation (server %s:%hu, error is %d %s)",
|
159 |
rep->address, rep->port, req->response_code, req->response_code_line); |
160 |
size_t response_len = evbuffer_get_length(req->input_buffer); |
161 |
if (response_len) {
|
162 |
char *response = malloc(response_len + 1); |
163 |
evbuffer_remove(req->input_buffer, response, response_len); |
164 |
response[response_len] = 0;
|
165 |
debug("Response string (len %d): %s", response_len, response);
|
166 |
free(response); |
167 |
} |
168 |
} |
169 |
|
170 |
int i;
|
171 |
for (i = 0; i != rep->in_transit_entries; i++) { |
172 |
request_data *cbdata = rep->in_transit[i].requestdata; |
173 |
cb_repPublish user_cb = cbdata->cb; |
174 |
HANDLE id = cbdata->id; |
175 |
free(cbdata); |
176 |
if (user_cb) user_cb((HANDLE)rep, id, cbdata->cbarg, response == 200 ? 0 : response); |
177 |
free_measurementrecord(&(rep->in_transit[i].r)); |
178 |
} |
179 |
debug("Freeing up %d in-transit entries", rep->in_transit_entries);
|
180 |
free(rep->in_transit); |
181 |
rep->in_transit = NULL;
|
182 |
rep->in_transit_entries = 0;
|
183 |
} |
184 |
|
185 |
#define REPO_RECORD_LEN 300 |
186 |
/** Batch publish callback */
|
187 |
void deferred_publish_cb(evutil_socket_t fd, short what, void *arg) { |
188 |
struct reposerver *rep = (struct reposerver *)arg; |
189 |
|
190 |
if (rep->publish_buffer_entries) {
|
191 |
debug("Deferred publish callback: %d entries to publish",
|
192 |
rep->publish_buffer_entries); |
193 |
char *post_data = malloc(rep->publish_buffer_entries *
|
194 |
REPO_RECORD_LEN + 10);
|
195 |
if (!post_data) fatal("Out of memory!"); |
196 |
post_data[0] = 0; |
197 |
|
198 |
int i;
|
199 |
for (i = 0; i != rep->publish_buffer_entries; i++) { |
200 |
const char *enc; |
201 |
enc = encode_measurementrecord(&(rep->publish_buffer[i].r)); |
202 |
if(strlen(enc) >= REPO_RECORD_LEN) {
|
203 |
warn("Skipping publish of HUGE record of %d bytes", strlen(enc));
|
204 |
} |
205 |
else {
|
206 |
strcat(post_data,enc); |
207 |
strcat(post_data, "\n");
|
208 |
} |
209 |
} |
210 |
|
211 |
make_post_request("/BatchPublish", post_data,
|
212 |
_batch_publish_callback, rep); |
213 |
|
214 |
rep->in_transit = rep->publish_buffer; |
215 |
rep->in_transit_entries = rep->publish_buffer_entries; |
216 |
rep->publish_buffer_entries = 0;
|
217 |
rep->publish_buffer = calloc(sizeof(struct deferred_publish), |
218 |
PUBLISH_BUFFER_SIZE); |
219 |
if (!rep->publish_buffer) fatal("Out of memory"); |
220 |
|
221 |
free(post_data); |
222 |
|
223 |
} |
224 |
|
225 |
if (rep->publish_delay) {
|
226 |
struct timeval t = { rep->publish_delay, 0 }; |
227 |
event_base_once(eventbase, -1, EV_TIMEOUT, deferred_publish_cb, rep, &t);
|
228 |
} |
229 |
} |