Statistics
| Branch: | Revision:

napa-baselibs / rep / repoclient.c @ 5f3adef4

History | View | Annotate | Download (6.47 KB)

1
#define        LOG_MODULE "[rep] "
2
#include        "repoclient_impl.h"
3

    
4
/** Parse a server specification for repOpen */
5
int parse_serverspec(const char *server, char **addr, unsigned short *port);
6
/** Batch publish callback */
7
void deferred_publish_cb(evutil_socket_t fd, short what, void *arg);
8

    
9
void repInit(const char *config) {
10
        if ((publish_streambuffer.stream = open_memstream(&publish_streambuffer.buffer, &publish_streambuffer.len)) 
11
                == NULL) fatal("Unable to initilize repoclient, file %s, line %d", __FILE__, __LINE__);
12
        debug("The Repository client library is ready");
13
}
14

    
15
/** Initialize the repoclient instance by parsing the server spec string and setting up things */
16
HANDLE repOpen(const char *server, int publish_delay) {
17
        struct reposerver *rep = malloc(sizeof(struct reposerver));
18
        if (!rep) fatal("Out of memory while initializing repository client for %s", server);
19
        rep->magic=REPOSERVER_MAGIC;
20
        rep->publish_buffer = NULL;
21
        rep->publish_buffer_entries = 0;
22
        rep->publish_delay = publish_delay;
23
        rep->in_transit = NULL;
24
        rep->in_transit_entries = 0;
25
        parse_serverspec(server, &(rep->address), &(rep->port));
26

    
27
        info("Opening repository client %p to http://%s:%d", rep, rep->address,  rep->port);
28

    
29
        if ((rep->evhttp_conn = evhttp_connection_base_new(eventbase,  rep->address,  rep->port)) == NULL) 
30
                fatal("Unable to establish connection to %s:%d", rep->address, rep->port);
31

    
32
        if (publish_delay) {
33
                rep->publish_buffer = calloc(sizeof(struct deferred_publish), PUBLISH_BUFFER_SIZE);
34
                rep->publish_buffer_entries = 0;
35
                /* Schedule the batch publisher */
36
                struct timeval t = { publish_delay, 0 };
37
                event_base_once(eventbase, -1, EV_TIMEOUT, deferred_publish_cb, rep, &t);
38
        }
39

    
40
        return (HANDLE *)rep;
41
}
42

    
43
/** Close the repoclient instance and free resources */
44
void repClose(HANDLE h) {
45
        if (!check_handle(h, __FUNCTION__)) return;
46
        struct reposerver *rep = (struct reposerver *)h;
47

    
48
        debug("Closing repository client %p to %s:%hu", h, rep->address, rep->port);
49
        evhttp_connection_free(rep->evhttp_conn);
50
        rep->magic=0;
51
        if (rep->publish_buffer_entries && rep->publish_buffer) {
52
                while (rep->publish_buffer_entries) {
53
                        free_measurementrecord(&(rep->publish_buffer[--rep->publish_buffer_entries].r));
54
                }
55
        }
56
        if (rep->publish_buffer) free(rep->publish_buffer);
57
        if (rep->in_transit_entries && rep->in_transit) {
58
                while (rep->in_transit_entries) {
59
                        free_measurementrecord(&(rep->in_transit[--rep->in_transit_entries].r));
60
                }
61
        }
62
        if (rep->in_transit) free(rep->in_transit);
63
        free(rep);
64
}
65

    
66
HANDLE repListMeasurementNames(HANDLE rep, cb_repListMeasurementNames cb, void *cbarg, int maxResults, const char *ch) {
67
        static int id = 1;
68
        if (!check_handle(rep, __FUNCTION__)) return NULL;
69
        debug("About to call listMeasurementIds with maxResults %d", maxResults);
70

    
71
        char uri[1024];
72
        request_data *rd = (request_data *)malloc(sizeof(request_data));
73
        if (!rd) return NULL;
74
        rd->id = (void *)rd;
75
        rd->server = (struct reposerver *)rep;
76
        rd->cb = cb;
77
        rd->data = maxResults;
78

    
79
        sprintf(uri, "/ListMeasurementNames?maxResults=%d",  maxResults);
80
        make_request(uri, _stringlist_callback, (void *)rd);
81
        return (HANDLE)(rd);
82
}
83

    
84
void make_request(const char *uri, void (*callback)(struct evhttp_request *, void *), void *cb_arg) {
85
        struct evhttp_request *req = evhttp_request_new(callback, cb_arg);
86
        request_data *rd = (request_data *)cb_arg;
87

    
88
        if (!req) {
89
                error("Failed to create request object");
90
                return;
91
        }
92
        evhttp_add_header(req->output_headers, "Connection", "close");
93

    
94
        if (evhttp_make_request(rd->server->evhttp_conn, req, EVHTTP_REQ_GET, uri)) {
95
                warn("evhttp_make_request failed");
96
        }
97
}
98

    
99
int make_post_request(const char *uri, const char *data, void (*callback)(struct evhttp_request *, void *), void *cb_arg) {
100
        struct evhttp_request *req = evhttp_request_new(callback, cb_arg);
101
        struct reposerver *rep = (struct reposerver *)cb_arg;
102
        
103
        if (!req) {
104
                error("Failed to create request object");
105
                return -1;
106
        }
107

    
108
        if (evhttp_add_header(req->output_headers, "Connection", "close") < 0) {
109
                error("Failed to add header");
110
                return -2;
111
        }
112

    
113
        if (evbuffer_add(req->output_buffer, data, strlen(data) + 1) < 0) {
114
                error("Failed to add data to request");
115
                return -3;
116
        }
117

    
118
        if (evhttp_make_request(rep->evhttp_conn, req, EVHTTP_REQ_POST, uri) < 0) {
119
                warn("evhttp_make_request failed");
120
                return -4;
121
        }
122
        return 0;
123
}
124

    
125
/** Parse a server specification of the form addr:port */
126
int parse_serverspec(const char *rep, char **addr, unsigned short *port) {
127
        int p = 0;
128
        if (!rep) fatal("NULL repository is provided to repoclient, unable to continue");
129

    
130
        /* Find the address part first */
131
        while (rep[p] != ':' && rep[p] != 0) p++;
132
        *addr = strdup(rep);
133
        (*addr)[p] = 0;
134
        /* Parse the port spec, 80 if not specified */
135
        *port  = 80;
136
        if (rep[p] != 0 && sscanf(rep + p + 1, "%hu", port) != 1) {
137
                fatal("Unable to parse repository server specification \"%s\"", rep);
138
        }
139
        return 0;
140
}
141

    
142
#if 0
143

144
void _countPeers_callback(struct evhttp_request *req,void *arg) {
145
        if (req == NULL || arg == NULL) return;
146
        request_data *cbdata = (request_data *)arg;
147

148
        void (*user_cb)(HANDLE rep, HANDLE id, int result) = cbdata->cb;        
149
        HANDLE id = cbdata->id;
150
        free(cbdata);
151

152
        int result = -1;
153
        if (req->response_code != HTTP_OK) {
154
                warn("Failed repository operation (id %p, error is %d %s): %s", 
155
                        id, req->response_code, req->response_code_line, req->uri);
156
                size_t response_len = evbuffer_get_length(req->input_buffer);
157
                if (response_len) {
158
                        char *response = malloc(response_len + 1);
159
                        evbuffer_remove(req->input_buffer, response, response_len);
160
                        response[response_len] = 0;
161
                        debug("Response string (len %d): %s", response_len, response);
162
                        free(response);
163
                }
164
                if (user_cb) user_cb(client, id, result);
165
                return;
166
        }
167

168
        char *line = evbuffer_readln(req->input_buffer, NULL, EVBUFFER_EOL_ANY);
169
        if (line) sscanf(line, "%d", &result);
170
        if (user_cb && result >= 0) user_cb(client, id, result);
171
}
172

173
HANDLE countPeers(HANDLE rep, cb_countPeers cb, Constraint *cons, int clen) {
174

175
        static int id = 1;
176
        if (!check_handle(rep, __FUNCTION__)) return NULL;
177
        debug("About to call countPeers with  constaints %s", constraints2str(cons, clen, 0));
178

179
        char uri[10240];
180
        request_data *rd = (request_data *)malloc(sizeof(request_data));
181
        if (!rd) return NULL;
182
        rd->id = (void *)id;
183
        rd->cb = cb;
184

185
        sprintf(uri, "/CountPeers?%s",  constraints2str(cons, clen, 0));
186
        debug("Making countPeers request with URI %s", uri);
187
        make_request(uri, _stringlist_callback, (void *)rd);
188
        return (HANDLE)(id++);
189
}
190

191
#endif
192