Statistics
| Branch: | Revision:

napa-baselibs / rep / publish.c @ 5f3adef4

History | View | Annotate | Download (5.68 KB)

1
/*
2
 * % vim: syntax=c ts=4 tw=76 cindent shiftwidth=4
3
 *
4
 */
5

    
6
#define LOG_MODULE "[rep] "
7
#include        "repoclient_impl.h"
8

    
9
struct streambuffer publish_streambuffer = { NULL, NULL, 0};
10

    
11
const char *encode_measurementrecord(const MeasurementRecord *r) {
12
        rewind(publish_streambuffer.stream);
13

    
14
        fprintf(publish_streambuffer.stream, "originator=%s&", r->originator);
15
        if (r->targetA)
16
                fprintf(publish_streambuffer.stream, "targetA=%s&", r->targetA);
17
        if (r->targetB)
18
                fprintf(publish_streambuffer.stream, "targetB=%s&", r->targetB);
19
        fprintf(publish_streambuffer.stream, "published_name=%s", r->published_name);
20

    
21
        if (r->string_value)
22
                fprintf(publish_streambuffer.stream, "&string_value=%s", r->string_value);
23
        else if (r->value != (0.0/0.0)) 
24
                fprintf(publish_streambuffer.stream, "&value=%lf", r->value);
25

    
26
        if (r->channel) 
27
                fprintf(publish_streambuffer.stream, "&channel=%s", r->channel);
28

    
29
        if (r->timestamp.tv_sec + r->timestamp.tv_usec != 0) {
30
                fprintf(publish_streambuffer.stream, "&timestamp=%s",
31
                                timeval2str(&(r->timestamp)));
32
        }
33

    
34
        fflush(publish_streambuffer.stream);
35
        /*debug("result: %s", publish_streambuffer.buffer);*/
36
        return publish_streambuffer.buffer;
37
}
38

    
39
void _publish_callback(struct evhttp_request *req,void *arg) {
40
        if (req == NULL || arg == NULL) return;
41
        request_data *cbdata = (request_data *)arg;
42
        struct reposerver *server = cbdata->server;
43
        cb_repPublish user_cb = cbdata->cb;
44
        HANDLE id = cbdata->id;
45
        free(cbdata);
46
        
47
        if (req->response_code != HTTP_OK) {
48
                warn("Failed PUBLISH operation (server %s:%hu,id %p, error is %d %s): %s", 
49
                        server->address, server->port, id, req->response_code, req->response_code_line, req->uri);
50
                size_t response_len = evbuffer_get_length(req->input_buffer);
51
                if (response_len) {
52
                        char *response = malloc(response_len + 1);
53
                        evbuffer_remove(req->input_buffer, response, response_len);
54
                        response[response_len] = 0;
55
                        debug("Response string (len %d): %s", response_len, response);
56
                        free(response);
57
                }
58
                if (user_cb) user_cb((HANDLE)server, id, cbdata->cbarg, req->response_code ? req->response_code : -1);
59
                return;
60
        }
61
        if (user_cb) user_cb((HANDLE)server, id, cbdata->cbarg,0);
62
}
63

    
64
HANDLE repPublish(HANDLE h, cb_repPublish cb, void *cbarg, MeasurementRecord *r) {
65
        static int id = 1;
66
        if (!check_handle(h, __FUNCTION__)) return NULL;
67
        struct reposerver *rep = (struct reposerver *)h;
68
        if (!r) {
69
                warn("Attempting to publish a NULL MeasurementRecord!");
70
                return 0;
71
        }
72
        if (isnan(r->value) && r->string_value == NULL) {
73
                warn("Attempting to publish an EMPTY record");
74
                return 0;
75
        }
76

    
77
        fprintf(stderr,"abouttopublish,%s,%s,%s,%s,%lf,%s,%s,%s\n", 
78
                r->originator, r->targetA, r->targetB, r->published_name, r->value, 
79
                r->string_value, r->channel, timeval2str(&(r->timestamp)));
80
        char uri[1024];
81
        char buf[1024];
82
        const char *encoded = encode_measurementrecord(r);
83

    
84
        request_data *rd = (request_data *)malloc(sizeof(request_data));
85
        if (!rd) return NULL;
86
        rd->id = (void *)rd;
87
        rd->cb = cb;
88
        rd->cbarg = cbarg;
89
        rd->server = (struct reposerver *)rep;
90

    
91
        if (rep->publish_delay) {
92
                if (rep->publish_buffer_entries >= PUBLISH_BUFFER_SIZE) {
93
                        warn("Publish buffer overflow!");
94
                        return;
95
                }
96
                int p = rep->publish_buffer_entries++;
97
                copy_measurementrecord(&(rep->publish_buffer[p].r), r);
98
                rep->publish_buffer[p].requestdata = rd;
99
        }
100
        else {
101
                sprintf(uri, "/Publish?%s", encode_measurementrecord(r));
102
                make_request(uri, _publish_callback, (void *)rd);
103
        }
104
        return (HANDLE)(rd);
105
}
106

    
107
/** libevent callback for deferred publishing */
108
void _batch_publish_callback(struct evhttp_request *req,void *arg) {
109
        if (req == NULL || arg == NULL) return;
110
        struct reposerver *rep = (struct reposerver *)arg;
111
        int response = req->response_code;
112

    
113
        if (response != HTTP_OK) {
114
                warn("Failed BATCH PUBLISH operation (server %s:%hu, error is %d %s)", 
115
                        rep->address, rep->port, req->response_code, req->response_code_line);
116
                size_t response_len = evbuffer_get_length(req->input_buffer);
117
                if (response_len) {
118
                        char *response = malloc(response_len + 1);
119
                        evbuffer_remove(req->input_buffer, response, response_len);
120
                        response[response_len] = 0;
121
                        debug("Response string (len %d): %s", response_len, response);
122
                        free(response);
123
                }
124
        }
125

    
126
        int i;
127
        for (i = 0; i != rep->in_transit_entries; i++) {
128
                request_data *cbdata = rep->in_transit[i].requestdata;
129
                cb_repPublish user_cb = cbdata->cb;
130
                HANDLE id = cbdata->id;
131
                free(cbdata);
132
                if (user_cb) user_cb((HANDLE)rep, id, cbdata->cbarg, response == 200 ? 0 : response);
133
                free_measurementrecord(&(rep->in_transit[i].r));
134
        }
135
        debug("Freeing up %d in-transit entries", rep->in_transit_entries);
136
        free(rep->in_transit);
137
        rep->in_transit = NULL;
138
        rep->in_transit_entries = 0;
139
}
140

    
141
/** Batch publish callback */
142
void deferred_publish_cb(evutil_socket_t fd, short what, void *arg) {
143
        struct reposerver *rep = (struct reposerver *)arg;
144

    
145
        if (rep->publish_buffer_entries) {
146
                debug("Deferred publish callback: %d entries to publish",
147
                                rep->publish_buffer_entries);
148
                char *post_data = malloc(rep->publish_buffer_entries * 256);
149
                if (!post_data) fatal("Out of memory!");
150
                post_data[0] = 0;
151

    
152
                int i;
153
                for (i = 0; i != rep->publish_buffer_entries; i++) {
154
                        strcat(post_data,
155
                                        encode_measurementrecord(&(rep->publish_buffer[i].r)));
156
                        strcat(post_data, "\n");
157
                }
158

    
159
                make_post_request("/BatchPublish", post_data,
160
                                _batch_publish_callback, rep);
161

    
162
                rep->in_transit = rep->publish_buffer;
163
                rep->in_transit_entries = rep->publish_buffer_entries;
164
                rep->publish_buffer_entries = 0;
165
                rep->publish_buffer = calloc(sizeof(struct deferred_publish),
166
                                PUBLISH_BUFFER_SIZE);
167
                if (!rep->publish_buffer) fatal("Out of memory");
168

    
169
                free(post_data);
170

    
171
        }
172

    
173
        if (rep->publish_delay) {
174
                struct timeval t = { rep->publish_delay, 0 };
175
                event_base_once(eventbase, -1, EV_TIMEOUT, deferred_publish_cb, rep, &t); 
176
        }
177
}
178