Statistics
| Branch: | Revision:

napa-baselibs / rep / publish.c @ 1e08ea16

History | View | Annotate | Download (6.7 KB)

1
/*
2
 * % vim: syntax=c ts=4 tw=76 cindent shiftwidth=4
3
 *
4
 */
5

    
6
#define LOG_MODULE "[rep] "
7
#include        "repoclient_impl.h"
8

    
9
struct streambuffer publish_streambuffer = { 0, NULL, 0};
10

    
11
void bprintf(struct streambuffer *sb, const char *txt, ...) {
12
        va_list str_args;
13
        va_start(str_args, txt);
14
#if !_WIN32 && !MAC_OS
15
        vfprintf(sb->stream, txt, str_args);
16
#else
17
        int space = sb->buffsize - sb->len;
18
        while(sb->buffer) { 
19
                int written = vsnprintf(sb->buffer+sb->len, space, txt, str_args);
20
                char *newbuf;
21
        if(written < space -1) {
22
                        sb->len += written;
23
                        break;
24
                }
25
                newbuf = (char *)realloc(sb->buffer, sb->buffsize +
26
                                SB_INCREMENT);
27
                if(newbuf == 0) {
28
                        fprintf(stderr, "Out of memory on memstream buffer extend (to %d bytes)",
29
                                        sb->buffsize + SB_INCREMENT); 
30
                        return;
31
                }
32
                sb->buffer = newbuf;
33
                sb->buffsize += SB_INCREMENT;
34
        }        
35
#endif
36
        va_end(str_args);
37
}
38

    
39
void brewind(struct streambuffer *sb) {
40
#if !_WIN32 && !MAC_OS
41
        rewind(sb->stream);
42
#else
43
        if(sb->buffer != NULL) *sb->buffer = 0;
44
        sb->len = 0;
45
#endif
46
}
47

    
48
void bflush(struct streambuffer *sb) {
49
#if !_WIN32 && !MAC_OS
50
            fflush(sb->stream);
51
#endif
52
}
53

    
54

    
55

    
56
const char *encode_measurementrecord(const MeasurementRecord *r) {
57
        brewind(&publish_streambuffer);
58

    
59
        bprintf(&publish_streambuffer, "originator=%s&", r->originator);
60
        if (r->targetA)
61
                bprintf(&publish_streambuffer, "targetA=%s&", r->targetA);
62
        if (r->targetB)
63
                bprintf(&publish_streambuffer, "targetB=%s&", r->targetB);
64
        bprintf(&publish_streambuffer, "published_name=%s", r->published_name);
65

    
66
        if (r->string_value)
67
                bprintf(&publish_streambuffer, "&string_value=%s", r->string_value);
68
        else if (r->value != (0.0/0.0)) 
69
                bprintf(&publish_streambuffer, "&value=%f", r->value);
70

    
71
        if (r->channel) 
72
                bprintf(&publish_streambuffer, "&channel=%s", r->channel);
73

    
74
        if (r->timestamp.tv_sec + r->timestamp.tv_usec != 0) {
75
                bprintf(&publish_streambuffer, "&timestamp=%s",
76
                                timeval2str(&(r->timestamp)));
77
        }
78

    
79
        bflush(&publish_streambuffer);
80
        /*debug("result: %s", publish_streambuffer.buffer);*/
81
        return publish_streambuffer.buffer;
82
}
83

    
84
void _publish_callback(struct evhttp_request *req,void *arg) {
85
        if (req == NULL || arg == NULL) return;
86
        request_data *cbdata = (request_data *)arg;
87
        struct reposerver *server = cbdata->server;
88
        cb_repPublish user_cb = cbdata->cb;
89
        HANDLE id = cbdata->id;
90
        free(cbdata);
91
        
92
        if (req->response_code != HTTP_OK) {
93
                warn("Failed PUBLISH operation (server %s:%hu,id %p, error is %d %s): %s", 
94
                        server->address, server->port, id, req->response_code, req->response_code_line, req->uri);
95
                size_t response_len = evbuffer_get_length(req->input_buffer);
96
                if (response_len) {
97
                        char *response = malloc(response_len + 1);
98
                        evbuffer_remove(req->input_buffer, response, response_len);
99
                        response[response_len] = 0;
100
                        debug("Response string (len %d): %s", response_len, response);
101
                        free(response);
102
                }        
103
                if (user_cb) user_cb((HANDLE)server, id, cbdata->cbarg, req->response_code ? req->response_code : -1);
104
                return;
105
        }
106
        if (user_cb) user_cb((HANDLE)server, id, cbdata->cbarg,0);
107
}
108

    
109
HANDLE repPublish(HANDLE h, cb_repPublish cb, void *cbarg, MeasurementRecord *r) {
110
        static int id = 1;
111
        if (!check_handle(h, __FUNCTION__)) return NULL;
112
        struct reposerver *rep = (struct reposerver *)h;
113
        if (!r) {
114
                warn("Attempting to publish a NULL MeasurementRecord!");
115
                return 0;
116
        }
117
        if (isnan(r->value) && r->string_value == NULL) {
118
                warn("Attempting to publish an EMPTY record");
119
                return 0;
120
        }
121

    
122
        //info("abouttopublish,%s,%s,%s,%s,%f,%s,%s,%s\n",
123
        //        r->originator, r->targetA, r->targetB, r->published_name, r->value,
124
        //        r->string_value, r->channel, timeval2str(&(r->timestamp)));
125
        char uri[1024];
126
        char buf[1024];
127

    
128
        request_data *rd = (request_data *)malloc(sizeof(request_data));
129
        if (!rd) return NULL;
130
        rd->id = (void *)rd;
131
        rd->cb = cb;
132
        rd->cbarg = cbarg;
133
        rd->server = (struct reposerver *)rep;
134

    
135
        if (rep->publish_delay) {
136
                if (rep->publish_buffer_entries >= PUBLISH_BUFFER_SIZE) {
137
                        warn("Publish buffer overflow!");
138
                        return;
139
                }
140
                int p = rep->publish_buffer_entries++;
141
                copy_measurementrecord(&(rep->publish_buffer[p].r), r);
142
                rep->publish_buffer[p].requestdata = rd;
143
        }
144
        else {
145
                sprintf(uri, "/Publish?%s", encode_measurementrecord(r));
146
                make_request(uri, _publish_callback, (void *)rd);
147
        }
148
        return (HANDLE)(rd);
149
}
150

    
151
/** libevent callback for deferred publishing */
152
void _batch_publish_callback(struct evhttp_request *req,void *arg) {
153
        if (req == NULL || arg == NULL) return;
154
        struct reposerver *rep = (struct reposerver *)arg;
155
        int response = req->response_code;
156

    
157
        if (response != HTTP_OK) {
158
                warn("Failed BATCH PUBLISH operation (server %s:%hu, error is %d %s)", 
159
                        rep->address, rep->port, req->response_code, req->response_code_line);
160
                size_t response_len = evbuffer_get_length(req->input_buffer);
161
                if (response_len) {
162
                        char *response = malloc(response_len + 1);
163
                        evbuffer_remove(req->input_buffer, response, response_len);
164
                        response[response_len] = 0;
165
                        debug("Response string (len %d): %s", response_len, response);
166
                        free(response);
167
                }
168
        }
169

    
170
        int i;
171
        for (i = 0; i != rep->in_transit_entries; i++) {
172
                request_data *cbdata = rep->in_transit[i].requestdata;
173
                cb_repPublish user_cb = cbdata->cb;
174
                HANDLE id = cbdata->id;
175
                free(cbdata);
176
                if (user_cb) user_cb((HANDLE)rep, id, cbdata->cbarg, response == 200 ? 0 : response);
177
                free_measurementrecord(&(rep->in_transit[i].r));
178
        }
179
        debug("Freeing up %d in-transit entries", rep->in_transit_entries);
180
        free(rep->in_transit);
181
        rep->in_transit = NULL;
182
        rep->in_transit_entries = 0;
183
}
184

    
185
#define REPO_RECORD_LEN 300
186
/** Batch publish callback */
187
void deferred_publish_cb(evutil_socket_t fd, short what, void *arg) {
188
        struct reposerver *rep = (struct reposerver *)arg;
189

    
190
        if (rep->publish_buffer_entries) {
191
                debug("Deferred publish callback: %d entries to publish",
192
                                rep->publish_buffer_entries);
193
                char *post_data = malloc(rep->publish_buffer_entries *
194
                                REPO_RECORD_LEN + 10);
195
                if (!post_data) fatal("Out of memory!");
196
                post_data[0] = 0;
197

    
198
                int i;
199
                for (i = 0; i != rep->publish_buffer_entries; i++) {
200
                        const char *enc;
201
                           enc = encode_measurementrecord(&(rep->publish_buffer[i].r));
202
                        if(strlen(enc) >= REPO_RECORD_LEN) {
203
                         warn("Skipping publish of HUGE record of %d bytes", strlen(enc));
204
                        }
205
                        else {
206
                                strcat(post_data,enc);
207
                                strcat(post_data, "\n");
208
                        }
209
                }
210

    
211
                make_post_request("/BatchPublish", post_data,
212
                                _batch_publish_callback, rep);
213

    
214
                rep->in_transit = rep->publish_buffer;
215
                rep->in_transit_entries = rep->publish_buffer_entries;
216
                rep->publish_buffer_entries = 0;
217
                rep->publish_buffer = calloc(sizeof(struct deferred_publish),
218
                                PUBLISH_BUFFER_SIZE);
219
                if (!rep->publish_buffer) fatal("Out of memory");
220

    
221
                free(post_data);
222

    
223
        }
224

    
225
        if (rep->publish_delay) {
226
                struct timeval t = { rep->publish_delay, 0 };
227
                event_base_once(eventbase, -1, EV_TIMEOUT, deferred_publish_cb, rep, &t); 
228
        }
229
}