Statistics
| Branch: | Revision:

napa-baselibs / rep / repoclient.c @ eddebd3e

History | View | Annotate | Download (6.53 KB)

1
#define        LOG_MODULE "[rep] "
2
#include        "repoclient_impl.h"
3

    
4
/** Parse a server specification for repOpen */
5
int parse_serverspec(const char *server, char **addr, unsigned short *port);
6
/** Batch publish callback */
7
void deferred_publish_cb(evutil_socket_t fd, short what, void *arg);
8

    
9
void repInit(const char *config) {
10
#ifndef WIN32
11
        if ((publish_streambuffer.stream = open_memstream(&publish_streambuffer.buffer, &publish_streambuffer.len)) 
12
                == NULL) fatal("Unable to initilize repoclient, file %s, line %d", __FILE__, __LINE__);
13
#endif
14
        debug("The Repository client library is ready");
15
}
16

    
17
/** Initialize the repoclient instance by parsing the server spec string and setting up things */
18
HANDLE repOpen(const char *server, int publish_delay) { 
19
        struct reposerver *rep = malloc(sizeof(struct reposerver));
20
        if (!rep) fatal("Out of memory while initializing repository client for %s", server);
21
        rep->magic=REPOSERVER_MAGIC;
22
        rep->publish_buffer = NULL;
23
        rep->publish_buffer_entries = 0;
24
        rep->publish_delay = publish_delay;
25
        rep->in_transit = NULL;
26
        rep->in_transit_entries = 0;
27
        parse_serverspec(server, &(rep->address), &(rep->port));
28

    
29
        info("Opening repository client %p to http://%s:%d", rep, rep->address,  rep->port);
30

    
31
        if ((rep->evhttp_conn = evhttp_connection_base_new(eventbase,  rep->address,  rep->port)) == NULL) 
32
                fatal("Unable to establish connection to %s:%d", rep->address, rep->port);
33

    
34
        if (publish_delay) {
35
                rep->publish_buffer = calloc(sizeof(struct deferred_publish), PUBLISH_BUFFER_SIZE);
36
                rep->publish_buffer_entries = 0;
37
                /* Schedule the batch publisher */
38
                struct timeval t = { publish_delay, 0 };
39
                event_base_once(eventbase, -1, EV_TIMEOUT, deferred_publish_cb, rep, &t);
40
        }
41

    
42
    debug(stderr,"X.repOpen cclosing\n");
43
       return rep;
44
}
45

    
46
/** Close the repoclient instance and free resources */
47
void repClose(HANDLE h) {
48
        if (!check_handle(h, __FUNCTION__)) return;
49
        struct reposerver *rep = (struct reposerver *)h;
50

    
51
        debug("Closing repository client %p to %s:%hu", h, rep->address, rep->port);
52
        evhttp_connection_free(rep->evhttp_conn);
53
        rep->magic=0;
54
        if (rep->publish_buffer_entries && rep->publish_buffer) {
55
                while (rep->publish_buffer_entries) {
56
                        free_measurementrecord(&(rep->publish_buffer[--rep->publish_buffer_entries].r));
57
                }
58
        }
59
        if (rep->publish_buffer) free(rep->publish_buffer);
60
        if (rep->in_transit_entries && rep->in_transit) {
61
                while (rep->in_transit_entries) {
62
                        free_measurementrecord(&(rep->in_transit[--rep->in_transit_entries].r));
63
                }
64
        }
65
        if (rep->in_transit) free(rep->in_transit);
66
        free(rep);
67
}
68

    
69
HANDLE repListMeasurementNames(HANDLE rep, cb_repListMeasurementNames cb, void *cbarg, int maxResults, const char *ch) {
70
        static int id = 1;
71
        if (!check_handle(rep, __FUNCTION__)) return NULL;
72
        debug("About to call listMeasurementIds with maxResults %d", maxResults);
73

    
74
        char uri[1024];
75
        request_data *rd = (request_data *)malloc(sizeof(request_data));
76
        if (!rd) return NULL;
77
        rd->id = (void *)rd;
78
        rd->server = (struct reposerver *)rep;
79
        rd->cb = cb;
80
        rd->data = maxResults;
81

    
82
        sprintf(uri, "/ListMeasurementNames?maxResults=%d",  maxResults);
83
        make_request(uri, _stringlist_callback, (void *)rd);
84
        return (HANDLE)(rd);
85
}
86

    
87
void make_request(const char *uri, void (*callback)(struct evhttp_request *, void *), void *cb_arg) {
88
        struct evhttp_request *req = evhttp_request_new(callback, cb_arg);
89
        request_data *rd = (request_data *)cb_arg;
90

    
91
        if (!req) {
92
                error("Failed to create request object");
93
                return;
94
        }
95
        evhttp_add_header(req->output_headers, "Connection", "close");
96

    
97
        if (evhttp_make_request(rd->server->evhttp_conn, req, EVHTTP_REQ_GET, uri)) {
98
                warn("evhttp_make_request failed");
99
        }
100
}
101

    
102
int make_post_request(const char *uri, const char *data, void (*callback)(struct evhttp_request *, void *), void *cb_arg) {
103
        struct evhttp_request *req = evhttp_request_new(callback, cb_arg);
104
        struct reposerver *rep = (struct reposerver *)cb_arg;
105
        
106
        if (!req) {
107
                error("Failed to create request object");
108
                return -1;
109
        }
110

    
111
        if (evhttp_add_header(req->output_headers, "Connection", "close") < 0) {
112
                error("Failed to add header");
113
                return -2;
114
        }
115

    
116
        if (evbuffer_add(req->output_buffer, data, strlen(data) + 1) < 0) {
117
                error("Failed to add data to request");
118
                return -3;
119
        }
120

    
121
        if (evhttp_make_request(rep->evhttp_conn, req, EVHTTP_REQ_POST, uri) < 0) {
122
                warn("evhttp_make_request failed");
123
                return -4;
124
        }
125
        return 0;
126
}
127

    
128
/** Parse a server specification of the form addr:port */
129
int parse_serverspec(const char *rep, char **addr, unsigned short *port) {
130
        int p = 0;
131
        if (!rep) fatal("NULL repository is provided to repoclient, unable to continue");
132

    
133
        /* Find the address part first */
134
        while (rep[p] != ':' && rep[p] != 0) p++;
135
        *addr = strdup(rep);
136
        (*addr)[p] = 0;
137
        /* Parse the port spec, 80 if not specified */
138
        *port  = 80;
139
        if (rep[p] != 0 && sscanf(rep + p + 1, "%hu", port) != 1) {
140
                fatal("Unable to parse repository server specification \"%s\"", rep);
141
        }
142
        return 0;
143
}
144

    
145
#if 0
146

147
void _countPeers_callback(struct evhttp_request *req,void *arg) {
148
        if (req == NULL || arg == NULL) return;
149
        request_data *cbdata = (request_data *)arg;
150

151
        void (*user_cb)(HANDLE rep, HANDLE id, int result) = cbdata->cb;        
152
        HANDLE id = cbdata->id;
153
        free(cbdata);
154

155
        int result = -1;
156
        if (req->response_code != HTTP_OK) {
157
                warn("Failed repository operation (id %p, error is %d %s): %s", 
158
                        id, req->response_code, req->response_code_line, req->uri);
159
                size_t response_len = evbuffer_get_length(req->input_buffer);
160
                if (response_len) {
161
                        char *response = malloc(response_len + 1);
162
                        evbuffer_remove(req->input_buffer, response, response_len);
163
                        response[response_len] = 0;
164
                        debug("Response string (len %d): %s", response_len, response);
165
                        free(response);
166
                }
167
                if (user_cb) user_cb(client, id, result);
168
                return;
169
        }
170

171
        char *line = evbuffer_readln(req->input_buffer, NULL, EVBUFFER_EOL_ANY);
172
        if (line) sscanf(line, "%d", &result);
173
        if (user_cb && result >= 0) user_cb(client, id, result);
174
}
175

176
HANDLE countPeers(HANDLE rep, cb_countPeers cb, Constraint *cons, int clen) {
177

178
        static int id = 1;
179
        if (!check_handle(rep, __FUNCTION__)) return NULL;
180
        debug("About to call countPeers with  constaints %s", constraints2str(cons, clen, 0));
181

182
        char uri[10240];
183
        request_data *rd = (request_data *)malloc(sizeof(request_data));
184
        if (!rd) return NULL;
185
        rd->id = (void *)id;
186
        rd->cb = cb;
187

188
        sprintf(uri, "/CountPeers?%s",  constraints2str(cons, clen, 0));
189
        debug("Making countPeers request with URI %s", uri);
190
        make_request(uri, _stringlist_callback, (void *)rd);
191
        return (HANDLE)(id++);
192
}
193

194
#endif
195