Revision d116c3e0

View differences:

src/CloudSupport/Makefile
8 8
UTILS_DIR = ../Utils
9 9

  
10 10
OBJS = cloud_helper.o cloud_helper_utils.o cloud_helper_delegate.o
11
DELEGATE_HELPERS = libs3_delegate_helper.so
11
DELEGATE_HELPERS = libs3_delegate_helper.so mysql_delegate_helper.so
12 12

  
13 13
CFLAGS += -I$(UTILS_DIR)
14 14

  
......
26 26

  
27 27
libs3_delegate_helper.o: CFLAGS += -fPIC
28 28

  
29
mysql_delegate_helper.so: ../net_helper$(NH_INCARNATION).o $(UTILS_DIR)/fifo_queue.o $(UTILS_DIR)/request_handler.o ../config.o
30
mysql_delegate_helper.so: CFLAGS += -shared -pthread
31
mysql_delegate_helper.so: LDFLAGS += -lmysqlclient
32
mysql_delegate_helper.so: mysql_delegate_helper.o
33
	$(CC) $(CFLAGS) -o $@ $^ $(LDFLAGS)
34

  
35
mysql_delegate_helper.o: CFLAGS += -fPIC
36

  
37

  
38

  
29 39
clean::
30 40
	rm -f *.so
src/CloudSupport/mysql_delegate_helper.c
1
/*
2
 *  Copyright (c) 2011 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 *
6
 *
7
 *  Delegate cloud_handler for MySql based on the MySql C Connector library.
8
 *  Supported parameters (*required):
9
 *
10
 *  - mysql_host*:  the mysql server host
11
 *  - mysql_user*:  the mysql user
12
 *  - mysql_pass*:  the mysql password
13
 *  - mysql_db*:    the mysql database to use
14
 */
15

  
16
#include <stdlib.h>
17
#include <stdio.h>
18
#include <stdint.h>
19
#include <unistd.h>
20
#include <string.h>
21
#include <sys/time.h>
22

  
23
#include <mysql.h>
24

  
25
#include "net_helper.h"
26
#include "request_handler.h"
27
#include "cloud_helper_iface.h"
28
#include "config.h"
29

  
30

  
31
#define CLOUD_NODE_ADDR "0.0.0.0"
32

  
33
/***********************************************************************
34
 * Interface prototype for cloud_helper_delegate
35
 ***********************************************************************/
36
struct delegate_iface {
37
  void* (*cloud_helper_init)(struct nodeID *local, const char *config);
38
  int (*get_from_cloud)(void *context, const char *key, uint8_t *header_ptr,
39
                        int header_size, int free_header);
40
  int (*put_on_cloud)(void *context, const char *key, uint8_t *buffer_ptr,
41
                      int buffer_size, int free_buffer);
42
  struct nodeID* (*get_cloud_node)(void *context, uint8_t variant);
43
  time_t (*timestamp_cloud)(void *context);
44
  int (*is_cloud_node)(void *context, struct nodeID* node);
45
  int (*wait4cloud)(void *context, struct timeval *tout);
46
  int (*recv_from_cloud)(void *context, uint8_t *buffer_ptr, int buffer_size);
47
};
48

  
49

  
50
struct mysql_cloud_context {
51
  struct req_handler_ctx *req_handler;
52
  MYSQL *mysql;
53
  time_t last_rsp_timestamp;
54
};
55

  
56

  
57
/* Definition of request structure */
58
enum mysql_helper_operation {PUT=0, GET=1};
59
struct mysql_request {
60
  enum mysql_helper_operation op;
61
  char *key;
62

  
63
  /* For GET operations this point to the header.
64
     For PUT this is the pointer to the actual data */
65
  uint8_t *data;
66
  int data_length;
67
  int free_data;
68

  
69
  struct mysql_cloud_context *helper_ctx;
70
};
71
typedef struct mysql_request mysql_request_t;
72

  
73

  
74
/* Definition of response structure */
75
enum mysql_helper_status {SUCCESS=0, ERROR=1};
76
struct mysql_get_response {
77
  enum mysql_helper_status status;
78
  uint8_t *data;
79
  uint8_t *current_byte;
80

  
81
  int data_length;
82
  int read_bytes;
83
  time_t last_timestamp;
84
};
85
typedef struct mysql_get_response mysql_get_response_t;
86

  
87

  
88

  
89
static const char* parse_required_param(struct tag *cfg_tags, const char *name)
90
{
91
  const char *arg;
92
  arg = config_value_str(cfg_tags, name);
93
  if (!arg) {
94
    fprintf(stderr, "mysql_delegate_helper: missing required parameter " \
95
            "'%s'\n", name);
96
    return NULL;
97
  }
98
  return arg;
99
}
100

  
101

  
102
static MYSQL* init_mysql(MYSQL **mysql) {
103
  /* initialize library */
104
  if (mysql_library_init(0, NULL, NULL)) {
105
    fprintf(stderr,
106
            "mysql_delegate_helper: could not initialize MySQL library\n");
107
    return NULL;
108
  }
109

  
110
  /* initialize mysql object */
111
  *mysql = NULL;
112
  *mysql = mysql_init(*mysql);
113

  
114
  return *mysql;
115
}
116

  
117
static int init_database(MYSQL *mysql)
118
{
119
  char query[256];
120
  int error;
121
  snprintf(query, 256, "CREATE TABLE IF NOT EXISTS cloud ("   \
122
           "  cloud_key VARCHAR(255),"\
123
           "  cloud_value BLOB," \
124
           "  timestamp INT UNSIGNED," \
125
           "  PRIMARY KEY (cloud_key))");
126
  error = mysql_query(mysql, query);
127
  if (error) {
128
    fprintf(stderr,
129
            "mysql_delegate_helper: error creating table: %s\n",
130
            mysql_error(mysql));
131
    return 1;
132
  }
133

  
134
  return 0;
135
}
136

  
137

  
138
static void deallocate_context(struct mysql_cloud_context *ctx)
139
{
140
  req_handler_destroy(ctx->req_handler);
141
  mysql_close(ctx->mysql);
142
  mysql_library_end();
143
}
144

  
145
static void free_request(void *req_ptr)
146
{
147
  mysql_request_t *req;
148
  req = (mysql_request_t *) req_ptr;
149
  if (req->free_data) free(req->data);
150

  
151
  free(req);
152
  return;
153
}
154

  
155
static void free_response(mysql_get_response_t *rsp)
156
{
157
  if (rsp->data) free(rsp->data);
158

  
159
  free(rsp);
160
}
161

  
162
/***********************************************************************
163
 * Implementation of request processing
164
 ***********************************************************************/
165
int process_get_operation(void *req_data, void **rsp_data)
166
{
167
  MYSQL_RES *result;
168
  MYSQL_ROW row;
169
  mysql_request_t *req;
170
  mysql_get_response_t *rsp;
171
  char query[256];
172
  unsigned long *field_len;
173
  int err;
174

  
175
  req = (mysql_request_t *) req_data;
176

  
177
  snprintf(query, 127, "SELECT cloud_value, timestamp FROM cloud WHERE " \
178
           "cloud_key='%s'", req->key);
179
  err = mysql_query(req->helper_ctx->mysql, query);
180
  if (err) {
181
    fprintf(stderr,
182
            "mysql_delegate_helper: error retrieving key: %s\n",
183
            mysql_error(req->helper_ctx->mysql));
184
    return -1;
185
  }
186

  
187
  rsp = malloc(sizeof(mysql_get_response_t));
188
  if (!rsp) return -1;
189

  
190
  /* initialize response */
191
  *rsp_data = rsp;
192
  rsp->status = ERROR;
193
  rsp->data = NULL;
194
  rsp->current_byte = NULL;
195
  rsp->data_length = 0;
196
  rsp->read_bytes = 0;
197
  rsp->last_timestamp = -1;
198

  
199
  result = mysql_store_result(req->helper_ctx->mysql);
200
  if (result) {
201
    row = mysql_fetch_row(result);
202
    if (row) {
203
      char ts_str[21];
204
      char *value;
205
      int value_len;
206

  
207
      field_len = mysql_fetch_lengths(result);
208
      value_len = field_len[0];
209
      value = row[0];
210

  
211
      /* reserve space for value and header */
212
      rsp->data_length = value_len + req->data_length;
213
      rsp->data = calloc(rsp->data_length, sizeof(char));
214
      rsp->current_byte = rsp->data;
215

  
216
      if (req->data_length > 0)
217
        memcpy(rsp->data, req->data, req->data_length);
218
      memcpy(rsp->data + req->data_length, value, value_len);
219
      memcpy(ts_str, row[1], field_len[1]);
220
      ts_str[field_len[1]] = '\0';
221
      rsp->last_timestamp = strtol(ts_str, NULL, 10);
222
      rsp->status = SUCCESS;
223
    }
224
    mysql_free_result(result);
225
  }
226

  
227
  return 0;
228
}
229

  
230
int process_put_operation(struct mysql_cloud_context *ctx,
231
                          mysql_request_t *req)
232
{
233
  char raw_stmt[] = "INSERT INTO cloud(cloud_key,cloud_value,timestamp)" \
234
    "VALUES('%s', '%s', %ld) ON DUPLICATE KEY UPDATE "                  \
235
    "cloud_value='%s', timestamp=%ld";
236
  char *stmt;
237
  char *escaped_value;
238
  int stmt_length;
239
  int escaped_length;
240
  int err;
241
  time_t now;
242

  
243
  escaped_value = calloc(req->data_length*2+1, sizeof(char));
244

  
245
  escaped_length = mysql_real_escape_string(ctx->mysql,
246
                                            escaped_value,
247
                                            req->data,
248
                                            req->data_length);
249

  
250

  
251
  /* reserve space for the statement: len(value/ts)+len(key)+len(SQL_cmd) */
252
  stmt_length = 2*escaped_length + 2*20 + strlen(req->key) + strlen(raw_stmt);
253
  stmt = calloc(stmt_length, sizeof(char));
254

  
255
  now = time(NULL);
256
  stmt_length = snprintf(stmt, stmt_length, raw_stmt, req->key, escaped_value,
257
                         now, escaped_value, now);
258

  
259
  err = mysql_real_query(ctx->mysql, stmt, stmt_length);
260
  if (err) {
261
    fprintf(stderr,
262
            "mysql_delegate_helper: error setting key: %s\n",
263
            mysql_error(ctx->mysql));
264
    return 1;
265
  }
266

  
267
  return 0;
268
}
269

  
270

  
271
/***********************************************************************
272
 * Implementation of interface delegate_iface
273
 ***********************************************************************/
274
void* cloud_helper_init(struct nodeID *local, const char *config)
275
{
276
  struct mysql_cloud_context *ctx;
277
  struct tag *cfg_tags;
278

  
279
  const char *mysql_host;
280
  const char *mysql_user;
281
  const char *mysql_pass;
282
  const char *mysql_db;
283

  
284

  
285
  ctx = malloc(sizeof(struct mysql_cloud_context));
286
  memset(ctx, 0, sizeof(struct mysql_cloud_context));
287
  cfg_tags = config_parse(config);
288

  
289
  /* Parse fundametal parameters */
290
  if (!(mysql_host=parse_required_param(cfg_tags, "mysql_host"))) {
291
    deallocate_context(ctx);
292
    return NULL;
293
  }
294

  
295
  if (!(mysql_user=parse_required_param(cfg_tags, "mysql_user"))) {
296
    deallocate_context(ctx);
297
    return NULL;
298
  }
299

  
300
  if (!(mysql_pass=parse_required_param(cfg_tags, "mysql_pass"))) {
301
    deallocate_context(ctx);
302
    return NULL;
303
  }
304

  
305
  if (!(mysql_db=parse_required_param(cfg_tags, "mysql_db"))) {
306
    deallocate_context(ctx);
307
    return NULL;
308
  }
309

  
310
  ctx->mysql = init_mysql(&ctx->mysql);
311
  if (!ctx->mysql) {
312
    deallocate_context(ctx);
313
    return NULL;
314
  }
315

  
316
  if (!mysql_real_connect(ctx->mysql,
317
                          mysql_host,
318
                          mysql_user,
319
                          mysql_pass,
320
                          mysql_db,
321
                          0, NULL, 0)) {
322
    fprintf(stderr,"mysql_delegate_helper: error connecting to db: %s\n",
323
            mysql_error(ctx->mysql));
324
    deallocate_context(ctx);
325
    return NULL;
326
  }
327

  
328
  if (init_database(ctx->mysql) != 0) {
329
    deallocate_context(ctx);
330
    return NULL;
331
  }
332

  
333
  ctx->req_handler = req_handler_init();
334
  if (!ctx->req_handler) {
335
    deallocate_context(ctx);
336
    return NULL;
337
  }
338

  
339
  return ctx;
340
}
341

  
342
int get_from_cloud(void *context, const char *key, uint8_t *header_ptr,
343
                   int header_size, int free_header)
344
{
345
  struct mysql_cloud_context *ctx;
346
  mysql_request_t *request;
347
  int err;
348

  
349
  ctx = (struct mysql_cloud_context *) context;
350
  request = malloc(sizeof(mysql_request_t));
351

  
352
  if (!request) return 1;
353

  
354
  request->op = GET;
355
  request->key = strdup(key);
356
  request->data = header_ptr;
357
  request->data_length = header_size;
358
  request->free_data = free_header;
359
  request->helper_ctx = ctx;
360

  
361
  err = req_handler_add_request(ctx->req_handler, &process_get_operation,
362
                                request, &free_request);
363
  if (err) free_request(request);
364

  
365
  return err;
366
}
367

  
368

  
369
int put_on_cloud(void *context, const char *key, uint8_t *buffer_ptr,
370
                 int buffer_size, int free_buffer)
371
{
372
  struct mysql_cloud_context *ctx;
373
  mysql_request_t *request;
374
  int res;
375

  
376
  ctx = (struct mysql_cloud_context *) context;
377
  request = malloc(sizeof(mysql_request_t));
378

  
379
  if (!request) return 1;
380

  
381
  request->op = PUT;
382
  request->key = strdup(key);
383
  request->data = buffer_ptr;
384
  request->data_length = buffer_size;
385
  request->free_data = free_buffer;
386

  
387
  res = process_put_operation(ctx, request);
388
  free_request(request);
389
  return res;
390
}
391

  
392
struct nodeID* get_cloud_node(void *context, uint8_t variant)
393
{
394
  return create_node(CLOUD_NODE_ADDR, variant);
395
}
396

  
397
time_t timestamp_cloud(void *context)
398
{
399
  struct mysql_cloud_context *ctx;
400
  ctx = (struct mysql_cloud_context *) context;
401

  
402
  return ctx->last_rsp_timestamp;
403
}
404

  
405
int is_cloud_node(void *context, struct nodeID* node)
406
{
407
  return strcmp(node_ip(node), CLOUD_NODE_ADDR) == 0;
408
}
409

  
410
int wait4cloud(void *context, struct timeval *tout)
411
{
412
  struct mysql_cloud_context *ctx;
413
  mysql_get_response_t *rsp;
414

  
415
  ctx = (struct mysql_cloud_context *) context;
416

  
417
  rsp = req_handler_wait4response(ctx->req_handler, tout);
418

  
419
  if (rsp) {
420
    if (rsp->status == SUCCESS) {
421
      ctx->last_rsp_timestamp = rsp->last_timestamp;
422
      return 1;
423
    } else {
424
      /* there was some error with the request */
425
      req_handler_remove_response(ctx->req_handler);
426
      free_response(rsp);
427
      return -1;
428
    }
429
  } else {
430
    return 0;
431
  }
432
}
433

  
434
int recv_from_cloud(void *context, uint8_t *buffer_ptr, int buffer_size)
435
{
436
  struct mysql_cloud_context *ctx;
437
  mysql_get_response_t *rsp;
438
  int toread;
439

  
440
  ctx = (struct mysql_cloud_context *) context;
441

  
442
  rsp = (mysql_get_response_t *) req_handler_get_response(ctx->req_handler);
443
  if (!rsp) return -1;
444

  
445
  if (rsp->read_bytes == rsp->data_length){
446
    free_response(rsp);
447
    req_handler_remove_response(ctx->req_handler);
448
    return 0;
449
  }
450

  
451
  toread = (rsp->data_length <= buffer_size)? rsp->data_length : buffer_size;
452

  
453
  memcpy(buffer_ptr, rsp->current_byte, toread);
454
  rsp->current_byte += toread;
455
  rsp->read_bytes += toread;
456

  
457
  return toread;
458
}
459

  
460
struct delegate_iface delegate_impl = {
461
  .cloud_helper_init = &cloud_helper_init,
462
  .get_from_cloud = &get_from_cloud,
463
  .put_on_cloud = &put_on_cloud,
464
  .get_cloud_node = &get_cloud_node,
465
  .timestamp_cloud = &timestamp_cloud,
466
  .is_cloud_node = &is_cloud_node,
467
  .wait4cloud = &wait4cloud,
468
  .recv_from_cloud = &recv_from_cloud
469
};

Also available in: Unified diff