Statistics
| Branch: | Revision:

napa-baselibs / rep / repoclient.c @ f88f3c9d

History | View | Annotate | Download (7 KB)

1
#define        LOG_MODULE "[rep] "
2
#include        "repoclient_impl.h"
3

    
4
/** Parse a server specification for repOpen */
5
int parse_serverspec(const char *server, char **addr, unsigned short *port);
6
/** Batch publish callback */
7
void deferred_publish_cb(evutil_socket_t fd, short what, void *arg);
8

    
9
void repInit(const char *config) {
10
#if !WIN32 && !MAC_OS
11
        if ((publish_streambuffer.stream = open_memstream(&publish_streambuffer.buffer, &publish_streambuffer.len)) 
12
                == NULL) fatal("Unable to initilize repoclient, file %s, line %d", __FILE__, __LINE__);
13
#else
14
        publish_streambuffer.buffsize = PUBLISH_BUFFER_SIZE;
15
        publish_streambuffer.buffer = malloc(publish_streambuffer.buffsize);
16
        if(publish_streambuffer.buffer == NULL) {
17
                publish_streambuffer.buffsize = 0;
18
                fatal("Unable to initilize repoclient, file %s, line %d", __FILE__, __LINE__);
19
        }
20
#endif
21
        else debug("The Repository client library is ready");
22
}
23

    
24
/** Initialize the repoclient instance by parsing the server spec string and setting up things */
25
HANDLE repOpen(const char *server, int publish_delay) { 
26
        struct reposerver *rep;
27

    
28
        if(server == NULL || strlen(server) == 0 || strcmp(server, "-") == 0) {
29
                warn("Repository publishing is disabled");
30
                return NULL;
31
        }
32

    
33
        rep = malloc(sizeof(struct reposerver));
34
        if (!rep) fatal("Out of memory while initializing repository client for %s", server);
35
        rep->magic=REPOSERVER_MAGIC;
36
        rep->publish_buffer = NULL;
37
        rep->publish_buffer_entries = 0;
38
        rep->publish_delay = publish_delay;
39
        rep->in_transit = NULL;
40
        rep->in_transit_entries = 0;
41
        parse_serverspec(server, &(rep->address), &(rep->port));
42

    
43
        info("Opening repository client %p to http://%s:%d", rep, rep->address,  rep->port);
44

    
45
        if ((rep->evhttp_conn = evhttp_connection_base_new(eventbase,  rep->address,  rep->port)) == NULL) 
46
                fatal("Unable to establish connection to %s:%d", rep->address, rep->port);
47

    
48
        if (publish_delay) {
49
                rep->publish_buffer = calloc(sizeof(struct deferred_publish), PUBLISH_BUFFER_SIZE);
50
                rep->publish_buffer_entries = 0;
51
                /* Schedule the batch publisher */
52
                struct timeval t = { publish_delay, 0 };
53
                event_base_once(eventbase, -1, EV_TIMEOUT, deferred_publish_cb, rep, &t);
54
        }
55

    
56
        debug("X.repOpen cclosing\n");
57
        return rep;
58
}
59

    
60
/** Close the repoclient instance and free resources */
61
void repClose(HANDLE h) {
62
        if (!check_handle(h, __FUNCTION__)) return;
63
        struct reposerver *rep = (struct reposerver *)h;
64

    
65
        debug("Closing repository client %p to %s:%hu", h, rep->address, rep->port);
66
        evhttp_connection_free(rep->evhttp_conn);
67
        rep->magic=0;
68
        if (rep->publish_buffer_entries && rep->publish_buffer) {
69
                while (rep->publish_buffer_entries) {
70
                        free_measurementrecord(&(rep->publish_buffer[--rep->publish_buffer_entries].r));
71
                }
72
        }
73
        if (rep->publish_buffer) free(rep->publish_buffer);
74
        if (rep->in_transit_entries && rep->in_transit) {
75
                while (rep->in_transit_entries) {
76
                        free_measurementrecord(&(rep->in_transit[--rep->in_transit_entries].r));
77
                }
78
        }
79
        if (rep->in_transit) free(rep->in_transit);
80
        free(rep);
81
}
82

    
83
HANDLE repListMeasurementNames(HANDLE rep, cb_repListMeasurementNames cb, void *cbarg, int maxResults, const char *ch) {
84
        static int id = 1;
85
        if (!check_handle(rep, __FUNCTION__)) return NULL;
86
        debug("About to call listMeasurementIds with maxResults %d", maxResults);
87

    
88
        char uri[1024];
89
        request_data *rd = (request_data *)malloc(sizeof(request_data));
90
        if (!rd) return NULL;
91
        rd->id = (void *)rd;
92
        rd->server = (struct reposerver *)rep;
93
        rd->cb = cb;
94
        rd->data = maxResults;
95

    
96
        sprintf(uri, "/ListMeasurementNames?maxResults=%d",  maxResults);
97
        make_request(uri, _stringlist_callback, (void *)rd);
98
        return (HANDLE)(rd);
99
}
100

    
101
void make_request(const char *uri, void (*callback)(struct evhttp_request *, void *), void *cb_arg) {
102
        struct evhttp_request *req = evhttp_request_new(callback, cb_arg);
103
        request_data *rd = (request_data *)cb_arg;
104

    
105
        if (!req) {
106
                error("Failed to create request object");
107
                return;
108
        }
109
        evhttp_add_header(req->output_headers, "Connection", "close");
110

    
111
        if (evhttp_make_request(rd->server->evhttp_conn, req, EVHTTP_REQ_GET, uri)) {
112
                warn("evhttp_make_request failed");
113
        }
114
}
115

    
116
int make_post_request(const char *uri, const char *data, void (*callback)(struct evhttp_request *, void *), void *cb_arg) {
117
        struct evhttp_request *req = evhttp_request_new(callback, cb_arg);
118
        struct reposerver *rep = (struct reposerver *)cb_arg;
119
        
120
        if (!req) {
121
                error("Failed to create request object");
122
                return -1;
123
        }
124

    
125
        if (evhttp_add_header(req->output_headers, "Connection", "close") < 0) {
126
                error("Failed to add header");
127
                return -2;
128
        }
129

    
130
        if (evbuffer_add(req->output_buffer, data, strlen(data) + 1) < 0) {
131
                error("Failed to add data to request");
132
                return -3;
133
        }
134

    
135
        if (evhttp_make_request(rep->evhttp_conn, req, EVHTTP_REQ_POST, uri) < 0) {
136
                warn("evhttp_make_request failed");
137
                return -4;
138
        }
139
        return 0;
140
}
141

    
142
/** Parse a server specification of the form addr:port */
143
int parse_serverspec(const char *rep, char **addr, unsigned short *port) {
144
        int p = 0;
145
        if (!rep) fatal("NULL repository is provided to repoclient, unable to continue");
146

    
147
        /* Find the address part first */
148
        while (rep[p] != ':' && rep[p] != 0) p++;
149
        *addr = strdup(rep);
150
        (*addr)[p] = 0;
151
        /* Parse the port spec, 80 if not specified */
152
        *port  = 80;
153
        if (rep[p] != 0 && sscanf(rep + p + 1, "%hu", port) != 1) {
154
                fatal("Unable to parse repository server specification \"%s\"", rep);
155
        }
156
        return 0;
157
}
158

    
159
#if 0
160

161
void _countPeers_callback(struct evhttp_request *req,void *arg) {
162
        if (req == NULL || arg == NULL) return;
163
        request_data *cbdata = (request_data *)arg;
164

165
        void (*user_cb)(HANDLE rep, HANDLE id, int result) = cbdata->cb;        
166
        HANDLE id = cbdata->id;
167
        free(cbdata);
168

169
        int result = -1;
170
        if (req->response_code != HTTP_OK) {
171
                warn("Failed repository operation (id %p, error is %d %s): %s", 
172
                        id, req->response_code, req->response_code_line, req->uri);
173
                size_t response_len = evbuffer_get_length(req->input_buffer);
174
                if (response_len) {
175
                        char *response = malloc(response_len + 1);
176
                        evbuffer_remove(req->input_buffer, response, response_len);
177
                        response[response_len] = 0;
178
                        debug("Response string (len %d): %s", response_len, response);
179
                        free(response);
180
                }
181
                if (user_cb) user_cb(client, id, result);
182
                return;
183
        }
184

185
        char *line = evbuffer_readln(req->input_buffer, NULL, EVBUFFER_EOL_ANY);
186
        if (line) sscanf(line, "%d", &result);
187
        if (user_cb && result >= 0) user_cb(client, id, result);
188
}
189

190
HANDLE countPeers(HANDLE rep, cb_countPeers cb, Constraint *cons, int clen) {
191

192
        static int id = 1;
193
        if (!check_handle(rep, __FUNCTION__)) return NULL;
194
        debug("About to call countPeers with  constaints %s", constraints2str(cons, clen, 0));
195

196
        char uri[10240];
197
        request_data *rd = (request_data *)malloc(sizeof(request_data));
198
        if (!rd) return NULL;
199
        rd->id = (void *)id;
200
        rd->cb = cb;
201

202
        sprintf(uri, "/CountPeers?%s",  constraints2str(cons, clen, 0));
203
        debug("Making countPeers request with URI %s", uri);
204
        make_request(uri, _stringlist_callback, (void *)rd);
205
        return (HANDLE)(id++);
206
}
207

208
#endif
209