Statistics
| Branch: | Revision:

grapes / src / CloudSupport / mysql_delegate_helper.c @ 176b8de8

History | View | Annotate | Download (14 KB)

1
/*
2
 *  Copyright (c) 2011 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 *
6
 *
7
 *  Delegate cloud_handler for MySql based on the MySql C Connector library.
8
 *  Supported parameters (*required):
9
 *
10
 *  - mysql_host*:  the mysql server host
11
 *  - mysql_user*:  the mysql user
12
 *  - mysql_pass*:  the mysql password
13
 *  - mysql_db*:    the mysql database to use
14
 */
15

    
16
#include <stdlib.h>
17
#include <stdio.h>
18
#include <stdint.h>
19
#include <unistd.h>
20
#include <string.h>
21
#include <sys/time.h>
22

    
23
#include <mysql.h>
24

    
25
#include "net_helper.h"
26
#include "request_handler.h"
27
#include "cloud_helper_iface.h"
28
#include "grapes_config.h"
29

    
30

    
31
#define CLOUD_NODE_ADDR "0.0.0.0"
32

    
33
/***********************************************************************
34
 * Interface prototype for cloud_helper_delegate
35
 ***********************************************************************/
36
struct delegate_iface {
37
  void* (*cloud_helper_init)(struct nodeID *local, const char *config);
38
  int (*get_from_cloud)(void *context, const char *key, uint8_t *header_ptr,
39
                        int header_size, int free_header);
40
  int (*get_from_cloud_default)(void *context, const char *key,
41
                                uint8_t *header_ptr, int header_size, int free_header,
42
                                uint8_t *defval_ptr, int defval_size, int free_defval);
43
  int (*put_on_cloud)(void *context, const char *key, uint8_t *buffer_ptr,
44
                      int buffer_size, int free_buffer);
45
  struct nodeID* (*get_cloud_node)(void *context, uint8_t variant);
46
  time_t (*timestamp_cloud)(void *context);
47
  int (*is_cloud_node)(void *context, struct nodeID* node);
48
  int (*wait4cloud)(void *context, struct timeval *tout);
49
  int (*recv_from_cloud)(void *context, uint8_t *buffer_ptr, int buffer_size);
50
};
51

    
52

    
53
struct mysql_cloud_context {
54
  struct req_handler_ctx *req_handler;
55
  MYSQL *mysql;
56
  time_t last_rsp_timestamp;
57
};
58

    
59

    
60
/* Definition of request structure */
61
enum mysql_helper_operation {PUT=0, GET=1};
62
struct mysql_request {
63
  enum mysql_helper_operation op;
64
  char *key;
65

    
66
  /* For GET operations this point to the header.
67
     For PUT this is the pointer to the actual data */
68
  uint8_t *data;
69
  int data_length;
70
  int free_data;
71

    
72
  uint8_t *default_value;
73
  int default_value_length;
74
  int free_default_value;
75
  struct mysql_cloud_context *helper_ctx;
76
};
77
typedef struct mysql_request mysql_request_t;
78

    
79

    
80
/* Definition of response structure */
81
enum mysql_helper_status {SUCCESS=0, ERROR=1};
82
struct mysql_get_response {
83
  enum mysql_helper_status status;
84
  uint8_t *data;
85
  uint8_t *current_byte;
86

    
87
  int data_length;
88
  int read_bytes;
89
  time_t last_timestamp;
90
};
91
typedef struct mysql_get_response mysql_get_response_t;
92

    
93

    
94

    
95
static const char* parse_required_param(struct tag *cfg_tags, const char *name)
96
{
97
  const char *arg;
98
  arg = grapes_config_value_str(cfg_tags, name);
99
  if (!arg) {
100
    fprintf(stderr, "mysql_delegate_helper: missing required parameter " \
101
            "'%s'\n", name);
102
    return NULL;
103
  }
104
  return arg;
105
}
106

    
107

    
108
static MYSQL* init_mysql(MYSQL **mysql) {
109
  /* initialize library */
110
  if (mysql_library_init(0, NULL, NULL)) {
111
    fprintf(stderr,
112
            "mysql_delegate_helper: could not initialize MySQL library\n");
113
    return NULL;
114
  }
115

    
116
  /* initialize mysql object */
117
  *mysql = NULL;
118
  *mysql = mysql_init(*mysql);
119

    
120
  return *mysql;
121
}
122

    
123
static int init_database(MYSQL *mysql)
124
{
125
  char query[256];
126
  int error;
127
  snprintf(query, 256, "CREATE TABLE IF NOT EXISTS cloud ("   \
128
           "  cloud_key VARCHAR(255),"\
129
           "  cloud_value BLOB," \
130
           "  timestamp INT UNSIGNED," \
131
           "  counter INT UNSIGNED," \
132
           "  PRIMARY KEY (cloud_key))");
133
  error = mysql_query(mysql, query);
134
  if (error) {
135
    fprintf(stderr,
136
            "mysql_delegate_helper: error creating table: %s\n",
137
            mysql_error(mysql));
138
    return 1;
139
  }
140

    
141
  return 0;
142
}
143

    
144

    
145
static void deallocate_context(struct mysql_cloud_context *ctx)
146
{
147
  req_handler_destroy(ctx->req_handler);
148
  mysql_close(ctx->mysql);
149
  mysql_library_end();
150
}
151

    
152
static void free_request(void *req_ptr)
153
{
154
  mysql_request_t *req;
155
  req = (mysql_request_t *) req_ptr;
156
  if (req->free_data) free(req->data);
157

    
158
  if (req->free_default_value > 0)
159
    free(req->default_value);
160

    
161
  free(req);
162
  return;
163
}
164

    
165
static void free_response(mysql_get_response_t *rsp)
166
{
167
  if (rsp->data) free(rsp->data);
168

    
169
  free(rsp);
170
}
171

    
172
/***********************************************************************
173
 * Implementation of request processing
174
 ***********************************************************************/
175
int process_get_operation(void *req_data, void **rsp_data)
176
{
177
  MYSQL_RES *result;
178
  MYSQL_ROW row;
179
  mysql_request_t *req;
180
  mysql_get_response_t *rsp;
181
  char query[256];
182
  unsigned long *field_len;
183
  int err;
184

    
185
  req = (mysql_request_t *) req_data;
186

    
187
  snprintf(query, 127, "SELECT cloud_value, timestamp FROM cloud WHERE " \
188
           "cloud_key='%s'", req->key);
189
  err = mysql_query(req->helper_ctx->mysql, query);
190
  if (err) {
191
    fprintf(stderr,
192
            "mysql_delegate_helper: error retrieving key: %s\n",
193
            mysql_error(req->helper_ctx->mysql));
194
    return -1;
195
  }
196

    
197
  rsp = malloc(sizeof(mysql_get_response_t));
198
  if (!rsp) return -1;
199

    
200
  /* initialize response */
201
  *rsp_data = rsp;
202
  rsp->status = ERROR;
203
  rsp->data = NULL;
204
  rsp->current_byte = NULL;
205
  rsp->data_length = 0;
206
  rsp->read_bytes = 0;
207
  rsp->last_timestamp = -1;
208

    
209
  result = mysql_store_result(req->helper_ctx->mysql);
210
  if (result) {
211
    row = mysql_fetch_row(result);
212
    if (row) {
213
      char ts_str[21];
214
      char *value;
215
      int value_len;
216

    
217
      field_len = mysql_fetch_lengths(result);
218
      value_len = field_len[0];
219
      value = row[0];
220

    
221
      /* reserve space for value and header */
222
      rsp->data_length = value_len + req->data_length;
223
      rsp->data = calloc(rsp->data_length, sizeof(char));
224
      rsp->current_byte = rsp->data;
225

    
226
      if (req->data_length > 0)
227
        memcpy(rsp->data, req->data, req->data_length);
228
      memcpy(rsp->data + req->data_length, value, value_len);
229
      memcpy(ts_str, row[1], field_len[1]);
230
      ts_str[field_len[1]] = '\0';
231
      rsp->last_timestamp = strtol(ts_str, NULL, 10);
232
      rsp->status = SUCCESS;
233
    } else {
234
      /* Since there was no value for the specified key. If the caller specified a
235
         default value, use that */
236
      if (req->default_value) {
237
        /* reserve space for value and header */
238
        rsp->data_length = req->default_value_length + req->data_length;
239
        rsp->data = calloc(rsp->data_length, sizeof(char));
240
        rsp->current_byte = rsp->data;
241

    
242
        if (req->data_length > 0)
243
          memcpy(rsp->data, req->data, req->data_length);
244

    
245
        memcpy(rsp->data + req->data_length,
246
               req->default_value, req->default_value_length);
247

    
248
        rsp->last_timestamp = 0;
249
        rsp->status = SUCCESS;
250
      }
251
    }
252
    mysql_free_result(result);
253
  }
254

    
255
  return 0;
256
}
257

    
258
int process_put_operation(struct mysql_cloud_context *ctx,
259
                          mysql_request_t *req)
260
{
261
  char raw_stmt[] = "INSERT INTO cloud(cloud_key,cloud_value,timestamp, counter)" \
262
    "VALUES('%s', '%s', %ld, 0) ON DUPLICATE KEY UPDATE "                  \
263
    "cloud_value='%s', timestamp=%ld, counter=counter+1";
264
  char *stmt;
265
  char *escaped_value;
266
  int stmt_length;
267
  int escaped_length;
268
  int err;
269
  time_t now;
270

    
271
  escaped_value = calloc(req->data_length*2+1, sizeof(char));
272

    
273
  escaped_length = mysql_real_escape_string(ctx->mysql,
274
                                            escaped_value,
275
                                            req->data,
276
                                            req->data_length);
277

    
278

    
279
  /* reserve space for the statement: len(value/ts)+len(key)+len(SQL_cmd) */
280
  stmt_length = 2*escaped_length + 2*20 + strlen(req->key) + strlen(raw_stmt);
281
  stmt = calloc(stmt_length, sizeof(char));
282

    
283
  now = time(NULL);
284
  stmt_length = snprintf(stmt, stmt_length, raw_stmt, req->key, escaped_value,
285
                         now, escaped_value, now);
286

    
287
  err = mysql_real_query(ctx->mysql, stmt, stmt_length);
288
  if (err) {
289
    fprintf(stderr,
290
            "mysql_delegate_helper: error setting key: %s\n",
291
            mysql_error(ctx->mysql));
292
    return 1;
293
  }
294

    
295
  return 0;
296
}
297

    
298

    
299
/***********************************************************************
300
 * Implementation of interface delegate_iface
301
 ***********************************************************************/
302
void* cloud_helper_init(struct nodeID *local, const char *config)
303
{
304
  struct mysql_cloud_context *ctx;
305
  struct tag *cfg_tags;
306

    
307
  const char *mysql_host;
308
  const char *mysql_user;
309
  const char *mysql_pass;
310
  const char *mysql_db;
311

    
312

    
313
  ctx = malloc(sizeof(struct mysql_cloud_context));
314
  memset(ctx, 0, sizeof(struct mysql_cloud_context));
315
  cfg_tags = grapes_config_parse(config);
316

    
317
  /* Parse fundametal parameters */
318
  if (!(mysql_host=parse_required_param(cfg_tags, "mysql_host"))) {
319
    deallocate_context(ctx);
320
    return NULL;
321
  }
322

    
323
  if (!(mysql_user=parse_required_param(cfg_tags, "mysql_user"))) {
324
    deallocate_context(ctx);
325
    return NULL;
326
  }
327

    
328
  if (!(mysql_pass=parse_required_param(cfg_tags, "mysql_pass"))) {
329
    deallocate_context(ctx);
330
    return NULL;
331
  }
332

    
333
  if (!(mysql_db=parse_required_param(cfg_tags, "mysql_db"))) {
334
    deallocate_context(ctx);
335
    return NULL;
336
  }
337

    
338
  ctx->mysql = init_mysql(&ctx->mysql);
339
  if (!ctx->mysql) {
340
    deallocate_context(ctx);
341
    return NULL;
342
  }
343

    
344
  if (!mysql_real_connect(ctx->mysql,
345
                          mysql_host,
346
                          mysql_user,
347
                          mysql_pass,
348
                          mysql_db,
349
                          0, NULL, 0)) {
350
    fprintf(stderr,"mysql_delegate_helper: error connecting to db: %s\n",
351
            mysql_error(ctx->mysql));
352
    deallocate_context(ctx);
353
    return NULL;
354
  }
355

    
356
  if (init_database(ctx->mysql) != 0) {
357
    deallocate_context(ctx);
358
    return NULL;
359
  }
360

    
361
  ctx->req_handler = req_handler_init();
362
  if (!ctx->req_handler) {
363
    deallocate_context(ctx);
364
    return NULL;
365
  }
366

    
367
  return ctx;
368
}
369

    
370
int get_from_cloud_default(void *context, const char *key,
371
                           uint8_t *header_ptr, int header_size, int free_header,
372
                           uint8_t *defval_ptr, int defval_size, int free_defval)
373
{
374
  struct mysql_cloud_context *ctx;
375
  mysql_request_t *request;
376
  int err;
377

    
378
  ctx = (struct mysql_cloud_context *) context;
379
  request = malloc(sizeof(mysql_request_t));
380

    
381
  if (!request) return 1;
382

    
383
  request->op = GET;
384
  request->key = strdup(key);
385
  request->data = header_ptr;
386
  request->data_length = header_size;
387
  request->free_data = free_header;
388
  request->default_value = defval_ptr;
389
  request->default_value_length = defval_size;
390
  request->free_default_value = free_defval;
391
  request->helper_ctx = ctx;
392

    
393
  err = req_handler_add_request(ctx->req_handler, &process_get_operation,
394
                                request, &free_request);
395
  if (err) free_request(request);
396

    
397
  return err;
398
}
399

    
400
int get_from_cloud(void *context, const char *key, uint8_t *header_ptr,
401
                   int header_size, int free_header)
402
{
403
  return get_from_cloud_default(context, key, header_ptr, header_size,
404
                                free_header, NULL, 0, 0);
405
}
406

    
407

    
408
int put_on_cloud(void *context, const char *key, uint8_t *buffer_ptr,
409
                 int buffer_size, int free_buffer)
410
{
411
  struct mysql_cloud_context *ctx;
412
  mysql_request_t *request;
413
  int res;
414

    
415
  ctx = (struct mysql_cloud_context *) context;
416
  request = malloc(sizeof(mysql_request_t));
417

    
418
  if (!request) return 1;
419

    
420
  request->op = PUT;
421
  request->key = strdup(key);
422
  request->data = buffer_ptr;
423
  request->data_length = buffer_size;
424
  request->free_data = free_buffer;
425
  request->default_value = NULL;
426
  request->default_value_length = 0;
427
  request->free_default_value = 0;
428
  res = process_put_operation(ctx, request);
429
  free_request(request);
430
  return res;
431
}
432

    
433
struct nodeID* get_cloud_node(void *context, uint8_t variant)
434
{
435
  return create_node(CLOUD_NODE_ADDR, variant);
436
}
437

    
438
time_t timestamp_cloud(void *context)
439
{
440
  struct mysql_cloud_context *ctx;
441
  ctx = (struct mysql_cloud_context *) context;
442

    
443
  return ctx->last_rsp_timestamp;
444
}
445

    
446
int is_cloud_node(void *context, struct nodeID* node)
447
{
448
  char ipaddr[96];
449
  node_ip(node, ipaddr, 96);
450
  return strcmp(ipaddr, CLOUD_NODE_ADDR) == 0;
451
}
452

    
453
int wait4cloud(void *context, struct timeval *tout)
454
{
455
  struct mysql_cloud_context *ctx;
456
  mysql_get_response_t *rsp;
457

    
458
  ctx = (struct mysql_cloud_context *) context;
459

    
460
  rsp = req_handler_wait4response(ctx->req_handler, tout);
461

    
462
  if (rsp) {
463
    if (rsp->status == SUCCESS) {
464
      ctx->last_rsp_timestamp = rsp->last_timestamp;
465
      return 1;
466
    } else {
467
      /* there was some error with the request */
468
      req_handler_remove_response(ctx->req_handler);
469
      free_response(rsp);
470
      return -1;
471
    }
472
  } else {
473
    return 0;
474
  }
475
}
476

    
477
int recv_from_cloud(void *context, uint8_t *buffer_ptr, int buffer_size)
478
{
479
  struct mysql_cloud_context *ctx;
480
  mysql_get_response_t *rsp;
481
  int remaining;
482
  int toread;
483

    
484
  ctx = (struct mysql_cloud_context *) context;
485

    
486
  rsp = (mysql_get_response_t *) req_handler_get_response(ctx->req_handler);
487
  if (!rsp) return -1;
488

    
489
  /* If do not have further data just remove the request */
490
  if (rsp->read_bytes == rsp->data_length) {
491
    req_handler_remove_response(ctx->req_handler);
492
    free_response(rsp);
493
    return 0;
494
  }
495

    
496
  remaining = rsp->data_length - rsp->read_bytes;
497
  toread = (remaining <= buffer_size)? remaining : buffer_size;
498

    
499
  memcpy(buffer_ptr, rsp->current_byte, toread);
500
  rsp->current_byte += toread;
501
  rsp->read_bytes += toread;
502

    
503
  /* remove the response only if the read bytes are less the the allocated
504
     buuffer otherwise the client can't know when a single response finished */
505
  if (rsp->read_bytes == rsp->data_length && rsp->read_bytes < buffer_size){
506
    free_response(rsp);
507
    req_handler_remove_response(ctx->req_handler);
508
  }
509

    
510
  return toread;
511
}
512

    
513
struct delegate_iface delegate_impl = {
514
  .cloud_helper_init = &cloud_helper_init,
515
  .get_from_cloud = &get_from_cloud,
516
  .get_from_cloud_default = &get_from_cloud_default,
517
  .put_on_cloud = &put_on_cloud,
518
  .get_cloud_node = &get_cloud_node,
519
  .timestamp_cloud = &timestamp_cloud,
520
  .is_cloud_node = &is_cloud_node,
521
  .wait4cloud = &wait4cloud,
522
  .recv_from_cloud = &recv_from_cloud
523
};