Statistics
| Branch: | Revision:

grapes / src / CloudSupport / mysql_delegate_helper.c @ 98b047dd

History | View | Annotate | Download (13.9 KB)

1
/*
2
 *  Copyright (c) 2011 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 *
6
 *
7
 *  Delegate cloud_handler for MySql based on the MySql C Connector library.
8
 *  Supported parameters (*required):
9
 *
10
 *  - mysql_host*:  the mysql server host
11
 *  - mysql_user*:  the mysql user
12
 *  - mysql_pass*:  the mysql password
13
 *  - mysql_db*:    the mysql database to use
14
 */
15

    
16
#include <stdlib.h>
17
#include <stdio.h>
18
#include <stdint.h>
19
#include <unistd.h>
20
#include <string.h>
21
#include <sys/time.h>
22

    
23
#include <mysql.h>
24

    
25
#include "net_helper.h"
26
#include "request_handler.h"
27
#include "cloud_helper_iface.h"
28
#include "config.h"
29

    
30

    
31
#define CLOUD_NODE_ADDR "0.0.0.0"
32

    
33
/***********************************************************************
34
 * Interface prototype for cloud_helper_delegate
35
 ***********************************************************************/
36
struct delegate_iface {
37
  void* (*cloud_helper_init)(struct nodeID *local, const char *config);
38
  int (*get_from_cloud)(void *context, const char *key, uint8_t *header_ptr,
39
                        int header_size, int free_header);
40
  int (*get_from_cloud_default)(void *context, const char *key,
41
                                uint8_t *header_ptr, int header_size, int free_header,
42
                                uint8_t *defval_ptr, int defval_size, int free_defval);
43
  int (*put_on_cloud)(void *context, const char *key, uint8_t *buffer_ptr,
44
                      int buffer_size, int free_buffer);
45
  struct nodeID* (*get_cloud_node)(void *context, uint8_t variant);
46
  time_t (*timestamp_cloud)(void *context);
47
  int (*is_cloud_node)(void *context, struct nodeID* node);
48
  int (*wait4cloud)(void *context, struct timeval *tout);
49
  int (*recv_from_cloud)(void *context, uint8_t *buffer_ptr, int buffer_size);
50
};
51

    
52

    
53
struct mysql_cloud_context {
54
  struct req_handler_ctx *req_handler;
55
  MYSQL *mysql;
56
  time_t last_rsp_timestamp;
57
};
58

    
59

    
60
/* Definition of request structure */
61
enum mysql_helper_operation {PUT=0, GET=1};
62
struct mysql_request {
63
  enum mysql_helper_operation op;
64
  char *key;
65

    
66
  /* For GET operations this point to the header.
67
     For PUT this is the pointer to the actual data */
68
  uint8_t *data;
69
  int data_length;
70
  int free_data;
71

    
72
  uint8_t *default_value;
73
  int default_value_length;
74
  int free_default_value;
75
  struct mysql_cloud_context *helper_ctx;
76
};
77
typedef struct mysql_request mysql_request_t;
78

    
79

    
80
/* Definition of response structure */
81
enum mysql_helper_status {SUCCESS=0, ERROR=1};
82
struct mysql_get_response {
83
  enum mysql_helper_status status;
84
  uint8_t *data;
85
  uint8_t *current_byte;
86

    
87
  int data_length;
88
  int read_bytes;
89
  time_t last_timestamp;
90
};
91
typedef struct mysql_get_response mysql_get_response_t;
92

    
93

    
94

    
95
static const char* parse_required_param(struct tag *cfg_tags, const char *name)
96
{
97
  const char *arg;
98
  arg = config_value_str(cfg_tags, name);
99
  if (!arg) {
100
    fprintf(stderr, "mysql_delegate_helper: missing required parameter " \
101
            "'%s'\n", name);
102
    return NULL;
103
  }
104
  return arg;
105
}
106

    
107

    
108
static MYSQL* init_mysql(MYSQL **mysql) {
109
  /* initialize library */
110
  if (mysql_library_init(0, NULL, NULL)) {
111
    fprintf(stderr,
112
            "mysql_delegate_helper: could not initialize MySQL library\n");
113
    return NULL;
114
  }
115

    
116
  /* initialize mysql object */
117
  *mysql = NULL;
118
  *mysql = mysql_init(*mysql);
119

    
120
  return *mysql;
121
}
122

    
123
static int init_database(MYSQL *mysql)
124
{
125
  char query[256];
126
  int error;
127
  snprintf(query, 256, "CREATE TABLE IF NOT EXISTS cloud ("   \
128
           "  cloud_key VARCHAR(255),"\
129
           "  cloud_value BLOB," \
130
           "  timestamp INT UNSIGNED," \
131
           "  PRIMARY KEY (cloud_key))");
132
  error = mysql_query(mysql, query);
133
  if (error) {
134
    fprintf(stderr,
135
            "mysql_delegate_helper: error creating table: %s\n",
136
            mysql_error(mysql));
137
    return 1;
138
  }
139

    
140
  return 0;
141
}
142

    
143

    
144
static void deallocate_context(struct mysql_cloud_context *ctx)
145
{
146
  req_handler_destroy(ctx->req_handler);
147
  mysql_close(ctx->mysql);
148
  mysql_library_end();
149
}
150

    
151
static void free_request(void *req_ptr)
152
{
153
  mysql_request_t *req;
154
  req = (mysql_request_t *) req_ptr;
155
  if (req->free_data) free(req->data);
156

    
157
  if (req->free_default_value > 0)
158
    free(req->default_value);
159

    
160
  free(req);
161
  return;
162
}
163

    
164
static void free_response(mysql_get_response_t *rsp)
165
{
166
  if (rsp->data) free(rsp->data);
167

    
168
  free(rsp);
169
}
170

    
171
/***********************************************************************
172
 * Implementation of request processing
173
 ***********************************************************************/
174
int process_get_operation(void *req_data, void **rsp_data)
175
{
176
  MYSQL_RES *result;
177
  MYSQL_ROW row;
178
  mysql_request_t *req;
179
  mysql_get_response_t *rsp;
180
  char query[256];
181
  unsigned long *field_len;
182
  int err;
183

    
184
  req = (mysql_request_t *) req_data;
185

    
186
  snprintf(query, 127, "SELECT cloud_value, timestamp FROM cloud WHERE " \
187
           "cloud_key='%s'", req->key);
188
  err = mysql_query(req->helper_ctx->mysql, query);
189
  if (err) {
190
    fprintf(stderr,
191
            "mysql_delegate_helper: error retrieving key: %s\n",
192
            mysql_error(req->helper_ctx->mysql));
193
    return -1;
194
  }
195

    
196
  rsp = malloc(sizeof(mysql_get_response_t));
197
  if (!rsp) return -1;
198

    
199
  /* initialize response */
200
  *rsp_data = rsp;
201
  rsp->status = ERROR;
202
  rsp->data = NULL;
203
  rsp->current_byte = NULL;
204
  rsp->data_length = 0;
205
  rsp->read_bytes = 0;
206
  rsp->last_timestamp = -1;
207

    
208
  result = mysql_store_result(req->helper_ctx->mysql);
209
  if (result) {
210
    row = mysql_fetch_row(result);
211
    if (row) {
212
      char ts_str[21];
213
      char *value;
214
      int value_len;
215

    
216
      field_len = mysql_fetch_lengths(result);
217
      value_len = field_len[0];
218
      value = row[0];
219

    
220
      /* reserve space for value and header */
221
      rsp->data_length = value_len + req->data_length;
222
      rsp->data = calloc(rsp->data_length, sizeof(char));
223
      rsp->current_byte = rsp->data;
224

    
225
      if (req->data_length > 0)
226
        memcpy(rsp->data, req->data, req->data_length);
227
      memcpy(rsp->data + req->data_length, value, value_len);
228
      memcpy(ts_str, row[1], field_len[1]);
229
      ts_str[field_len[1]] = '\0';
230
      rsp->last_timestamp = strtol(ts_str, NULL, 10);
231
      rsp->status = SUCCESS;
232
    } else {
233
      /* Since there was no value for the specified key. If the caller specified a
234
         default value, use that */
235
      if (req->default_value) {
236
        /* reserve space for value and header */
237
        rsp->data_length = req->default_value_length + req->data_length;
238
        rsp->data = calloc(rsp->data_length, sizeof(char));
239
        rsp->current_byte = rsp->data;
240

    
241
        if (req->data_length > 0)
242
          memcpy(rsp->data, req->data, req->data_length);
243

    
244
        memcpy(rsp->data + req->data_length,
245
               req->default_value, req->default_value_length);
246

    
247
        rsp->last_timestamp = 0;
248
        rsp->status = SUCCESS;
249
      }
250
    }
251
    mysql_free_result(result);
252
  }
253

    
254
  return 0;
255
}
256

    
257
int process_put_operation(struct mysql_cloud_context *ctx,
258
                          mysql_request_t *req)
259
{
260
  char raw_stmt[] = "INSERT INTO cloud(cloud_key,cloud_value,timestamp)" \
261
    "VALUES('%s', '%s', %ld) ON DUPLICATE KEY UPDATE "                  \
262
    "cloud_value='%s', timestamp=%ld";
263
  char *stmt;
264
  char *escaped_value;
265
  int stmt_length;
266
  int escaped_length;
267
  int err;
268
  time_t now;
269

    
270
  escaped_value = calloc(req->data_length*2+1, sizeof(char));
271

    
272
  escaped_length = mysql_real_escape_string(ctx->mysql,
273
                                            escaped_value,
274
                                            req->data,
275
                                            req->data_length);
276

    
277

    
278
  /* reserve space for the statement: len(value/ts)+len(key)+len(SQL_cmd) */
279
  stmt_length = 2*escaped_length + 2*20 + strlen(req->key) + strlen(raw_stmt);
280
  stmt = calloc(stmt_length, sizeof(char));
281

    
282
  now = time(NULL);
283
  stmt_length = snprintf(stmt, stmt_length, raw_stmt, req->key, escaped_value,
284
                         now, escaped_value, now);
285

    
286
  err = mysql_real_query(ctx->mysql, stmt, stmt_length);
287
  if (err) {
288
    fprintf(stderr,
289
            "mysql_delegate_helper: error setting key: %s\n",
290
            mysql_error(ctx->mysql));
291
    return 1;
292
  }
293

    
294
  return 0;
295
}
296

    
297

    
298
/***********************************************************************
299
 * Implementation of interface delegate_iface
300
 ***********************************************************************/
301
void* cloud_helper_init(struct nodeID *local, const char *config)
302
{
303
  struct mysql_cloud_context *ctx;
304
  struct tag *cfg_tags;
305

    
306
  const char *mysql_host;
307
  const char *mysql_user;
308
  const char *mysql_pass;
309
  const char *mysql_db;
310

    
311

    
312
  ctx = malloc(sizeof(struct mysql_cloud_context));
313
  memset(ctx, 0, sizeof(struct mysql_cloud_context));
314
  cfg_tags = config_parse(config);
315

    
316
  /* Parse fundametal parameters */
317
  if (!(mysql_host=parse_required_param(cfg_tags, "mysql_host"))) {
318
    deallocate_context(ctx);
319
    return NULL;
320
  }
321

    
322
  if (!(mysql_user=parse_required_param(cfg_tags, "mysql_user"))) {
323
    deallocate_context(ctx);
324
    return NULL;
325
  }
326

    
327
  if (!(mysql_pass=parse_required_param(cfg_tags, "mysql_pass"))) {
328
    deallocate_context(ctx);
329
    return NULL;
330
  }
331

    
332
  if (!(mysql_db=parse_required_param(cfg_tags, "mysql_db"))) {
333
    deallocate_context(ctx);
334
    return NULL;
335
  }
336

    
337
  ctx->mysql = init_mysql(&ctx->mysql);
338
  if (!ctx->mysql) {
339
    deallocate_context(ctx);
340
    return NULL;
341
  }
342

    
343
  if (!mysql_real_connect(ctx->mysql,
344
                          mysql_host,
345
                          mysql_user,
346
                          mysql_pass,
347
                          mysql_db,
348
                          0, NULL, 0)) {
349
    fprintf(stderr,"mysql_delegate_helper: error connecting to db: %s\n",
350
            mysql_error(ctx->mysql));
351
    deallocate_context(ctx);
352
    return NULL;
353
  }
354

    
355
  if (init_database(ctx->mysql) != 0) {
356
    deallocate_context(ctx);
357
    return NULL;
358
  }
359

    
360
  ctx->req_handler = req_handler_init();
361
  if (!ctx->req_handler) {
362
    deallocate_context(ctx);
363
    return NULL;
364
  }
365

    
366
  return ctx;
367
}
368

    
369
int get_from_cloud_default(void *context, const char *key,
370
                           uint8_t *header_ptr, int header_size, int free_header,
371
                           uint8_t *defval_ptr, int defval_size, int free_defval)
372
{
373
  struct mysql_cloud_context *ctx;
374
  mysql_request_t *request;
375
  int err;
376

    
377
  ctx = (struct mysql_cloud_context *) context;
378
  request = malloc(sizeof(mysql_request_t));
379

    
380
  if (!request) return 1;
381

    
382
  request->op = GET;
383
  request->key = strdup(key);
384
  request->data = header_ptr;
385
  request->data_length = header_size;
386
  request->free_data = free_header;
387
  request->default_value = defval_ptr;
388
  request->default_value_length = defval_size;
389
  request->free_default_value = free_defval;
390
  request->helper_ctx = ctx;
391

    
392
  err = req_handler_add_request(ctx->req_handler, &process_get_operation,
393
                                request, &free_request);
394
  if (err) free_request(request);
395

    
396
  return err;
397
}
398

    
399
int get_from_cloud(void *context, const char *key, uint8_t *header_ptr,
400
                   int header_size, int free_header)
401
{
402
  return get_from_cloud_default(context, key, header_ptr, header_size,
403
                                free_header, NULL, 0, 0);
404
}
405

    
406

    
407
int put_on_cloud(void *context, const char *key, uint8_t *buffer_ptr,
408
                 int buffer_size, int free_buffer)
409
{
410
  struct mysql_cloud_context *ctx;
411
  mysql_request_t *request;
412
  int res;
413

    
414
  ctx = (struct mysql_cloud_context *) context;
415
  request = malloc(sizeof(mysql_request_t));
416

    
417
  if (!request) return 1;
418

    
419
  request->op = PUT;
420
  request->key = strdup(key);
421
  request->data = buffer_ptr;
422
  request->data_length = buffer_size;
423
  request->free_data = free_buffer;
424
  request->default_value = NULL;
425
  request->default_value_length = 0;
426
  request->free_default_value = 0;
427
  res = process_put_operation(ctx, request);
428
  free_request(request);
429
  return res;
430
}
431

    
432
struct nodeID* get_cloud_node(void *context, uint8_t variant)
433
{
434
  return create_node(CLOUD_NODE_ADDR, variant);
435
}
436

    
437
time_t timestamp_cloud(void *context)
438
{
439
  struct mysql_cloud_context *ctx;
440
  ctx = (struct mysql_cloud_context *) context;
441

    
442
  return ctx->last_rsp_timestamp;
443
}
444

    
445
int is_cloud_node(void *context, struct nodeID* node)
446
{
447
  char ipaddr[96];
448
  node_ip(node, ipaddr, 96);
449
  return strcmp(ipaddr, CLOUD_NODE_ADDR) == 0;
450
}
451

    
452
int wait4cloud(void *context, struct timeval *tout)
453
{
454
  struct mysql_cloud_context *ctx;
455
  mysql_get_response_t *rsp;
456

    
457
  ctx = (struct mysql_cloud_context *) context;
458

    
459
  rsp = req_handler_wait4response(ctx->req_handler, tout);
460

    
461
  if (rsp) {
462
    if (rsp->status == SUCCESS) {
463
      ctx->last_rsp_timestamp = rsp->last_timestamp;
464
      return 1;
465
    } else {
466
      /* there was some error with the request */
467
      req_handler_remove_response(ctx->req_handler);
468
      free_response(rsp);
469
      return -1;
470
    }
471
  } else {
472
    return 0;
473
  }
474
}
475

    
476
int recv_from_cloud(void *context, uint8_t *buffer_ptr, int buffer_size)
477
{
478
  struct mysql_cloud_context *ctx;
479
  mysql_get_response_t *rsp;
480
  int remaining;
481
  int toread;
482

    
483
  ctx = (struct mysql_cloud_context *) context;
484

    
485
  rsp = (mysql_get_response_t *) req_handler_get_response(ctx->req_handler);
486
  if (!rsp) return -1;
487

    
488
  /* If do not have further data just remove the request */
489
  if (rsp->read_bytes == rsp->data_length) {
490
    req_handler_remove_response(ctx->req_handler);
491
    free_response(rsp);
492
    return 0;
493
  }
494

    
495
  remaining = rsp->data_length - rsp->read_bytes;
496
  toread = (remaining <= buffer_size)? remaining : buffer_size;
497

    
498
  memcpy(buffer_ptr, rsp->current_byte, toread);
499
  rsp->current_byte += toread;
500
  rsp->read_bytes += toread;
501

    
502
  /* remove the response only if the read bytes are less the the allocated
503
     buuffer otherwise the client can't know when a single response finished */
504
  if (rsp->read_bytes == rsp->data_length && rsp->read_bytes < buffer_size){
505
    free_response(rsp);
506
    req_handler_remove_response(ctx->req_handler);
507
  }
508

    
509
  return toread;
510
}
511

    
512
struct delegate_iface delegate_impl = {
513
  .cloud_helper_init = &cloud_helper_init,
514
  .get_from_cloud = &get_from_cloud,
515
  .get_from_cloud_default = &get_from_cloud_default,
516
  .put_on_cloud = &put_on_cloud,
517
  .get_cloud_node = &get_cloud_node,
518
  .timestamp_cloud = &timestamp_cloud,
519
  .is_cloud_node = &is_cloud_node,
520
  .wait4cloud = &wait4cloud,
521
  .recv_from_cloud = &recv_from_cloud
522
};