Statistics
| Branch: | Revision:

grapes / src / CloudSupport / libs3_delegate_helper.c @ 176b8de8

History | View | Annotate | Download (20 KB)

1
/*
2
 *  Copyright (c) 2011 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 *
6
 *
7
 *  Delegate cloud_handler for AmazonS3 based on the libs3 library.
8
 *  Supported parameters (*required):
9
 *
10
 *  - s3_access_key*:  the Amazon access key id
11
 *  - s3_secret_key*:  the Amazon secret access key
12
 *  - s3_bucket_name*: the bucket on which operate
13
 *  - s3_protocol:     http (default) or https
14
 *  - s3_blocking_put: a value of 1 enable blocking operation.
15
 *                     (default: disabled)
16
 */
17
#include <stdlib.h>
18
#include <stdio.h>
19
#include <unistd.h>
20
#include <string.h>
21
#include <stdint.h>
22
#include <sys/time.h>
23

    
24
#include <libs3.h>
25

    
26
#include "net_helper.h"
27
#include "cloud_helper_iface.h"
28
#include "request_handler.h"
29
#include "grapes_config.h"
30

    
31
#define CLOUD_NODE_ADDR "0.0.0.0"
32

    
33
/***********************************************************************
34
 * Interface prototype for cloud_helper_delegate
35
 ***********************************************************************/
36
struct delegate_iface {
37
  void* (*cloud_helper_init)(struct nodeID *local, const char *config);
38
  int (*get_from_cloud)(void *context, const char *key, uint8_t *header_ptr,
39
                        int header_size, int free_header);
40
  int (*get_from_cloud_default)(void *context, const char *key,
41
                                uint8_t *header_ptr, int header_size, int free_header,
42
                                uint8_t *defval_ptr, int defval_size, int free_defval);
43
  int (*put_on_cloud)(void *context, const char *key, uint8_t *buffer_ptr,
44
                      int buffer_size, int free_buffer);
45
  struct nodeID* (*get_cloud_node)(void *context, uint8_t variant);
46
  time_t (*timestamp_cloud)(void *context);
47
  int (*is_cloud_node)(void *context, struct nodeID* node);
48
  int (*wait4cloud)(void *context, struct timeval *tout);
49
  int (*recv_from_cloud)(void *context, uint8_t *buffer_ptr, int buffer_size);
50
};
51

    
52
/***********************************************************************
53
 * libs3 data structures
54
 ***********************************************************************/
55
/* response properties handler */
56
static S3Status
57
libs3_response_properties_callback
58
(const S3ResponseProperties *properties,
59
 void *callbackData);
60

    
61
/* request completion callback */
62
static void
63
libs3_response_complete_callback
64
(S3Status status,
65
 const S3ErrorDetails *error,
66
 void *callbackData);
67

    
68
/* put request callback */
69
static int
70
libs3_put_object_data_callback
71
(int bufferSize,
72
 char *buffer,
73
 void *callbackData);
74

    
75
/* get request callback */
76
static S3Status
77
libs3_get_object_data_callback
78
(int bufferSize,
79
 const char *buffer,
80
 void *callbackData);
81

    
82

    
83
static S3PutObjectHandler libs3_put_object_handler =
84
  {
85
    {
86
      &libs3_response_properties_callback,
87
      &libs3_response_complete_callback
88
    },
89
    &libs3_put_object_data_callback
90
  };
91

    
92
static S3GetObjectHandler libs3_get_object_handler =
93
  {
94
    {
95
      &libs3_response_properties_callback,
96
      &libs3_response_complete_callback
97
    },
98
    &libs3_get_object_data_callback
99
  };
100

    
101

    
102
/***********************************************************************
103
 * libs3_delegate_helper contexts
104
 ***********************************************************************/
105

    
106
/* libs3 cloud context definition */
107
struct libs3_cloud_context {
108
  struct req_handler_ctx *req_handler;
109
  S3BucketContext s3_bucket_context;
110
  int blocking_put_request;
111
  time_t last_rsp_timestamp;
112
};
113

    
114
/***********************************************************************
115
 * Requests/Response pool data structures
116
 ***********************************************************************/
117
enum operation_t {PUT=0, GET=1};
118
struct libs3_request {
119
  enum operation_t op;
120
  char *key;
121

    
122
  /* For GET operations this point to the header.
123
     For PUT this is the pointer to the actual data */
124
  uint8_t *data;
125
  int data_length;
126
  int free_data;
127

    
128
  uint8_t *default_value;
129
  int default_value_length;
130
  int free_default_value;
131

    
132
  struct libs3_cloud_context *ctx;
133
};
134
typedef struct libs3_request libs3_request_t;
135

    
136
struct libs3_get_response {
137
  S3Status status;
138
  uint8_t *data;
139
  uint8_t *current_byte;
140

    
141
  int data_length;
142
  int read_bytes;
143
  time_t last_timestamp;
144
};
145
typedef struct libs3_get_response libs3_get_response_t;
146

    
147
struct libs3_callback_context {
148
  libs3_request_t *current_req;
149

    
150
  /* point to the current byte to read/write */
151
  uint8_t *start_ptr;
152

    
153
  /* number of bytes read/written until now */
154
  int bytes;
155

    
156
  /* store for get request */
157
  uint8_t *buffer;
158
  size_t buffer_size;
159

    
160
  time_t last_timestamp;
161
  S3Status status;
162
};
163

    
164

    
165
static void free_request(void *req_ptr)
166
{
167
  libs3_request_t *req;
168
  if (!req_ptr) return;
169

    
170
  req = (libs3_request_t *) req_ptr;
171

    
172
  free(req->key);
173
  if (req->free_data > 0) free(req->data);
174
  if (req->free_default_value > 0)
175
    free(req->default_value);
176

    
177
  free(req);
178
}
179

    
180
static void free_response(libs3_get_response_t *rsp) {
181
  if (rsp->data) free(rsp->data);
182

    
183
  free(rsp);
184
}
185

    
186
/************************************************************************
187
 * libs3 callback implementation
188
 ************************************************************************/
189
static S3Status
190
libs3_response_properties_callback(const S3ResponseProperties *properties,
191
                                   void *context)
192
{
193
  struct libs3_callback_context *req_ctx;
194
  req_ctx = (struct libs3_callback_context *) context;
195

    
196
  if (properties->lastModified > 0) {
197
    req_ctx->last_timestamp = (time_t) properties->lastModified;
198
  } else {
199
    req_ctx->last_timestamp = 0;
200
  }
201

    
202
  if (properties->contentLength && req_ctx->current_req->op == GET) {
203
    uint64_t actual_length;
204
    size_t supported_length;
205

    
206
    actual_length = (properties->contentLength +
207
                     req_ctx->current_req->data_length);
208
    supported_length = (size_t) actual_length;
209

    
210
    /* This is probably useless as if actual_length is so big
211
       there is no way we can keep it all in memory */
212
    if (supported_length < actual_length)
213
      return S3StatusAbortedByCallback;
214

    
215
    req_ctx->buffer = malloc(actual_length);
216
    if (!req_ctx->buffer)
217
      return S3StatusAbortedByCallback;
218
    req_ctx->buffer_size = actual_length;
219

    
220
    if (req_ctx->current_req->data_length > 0) {
221
      memcpy(req_ctx->buffer,
222
             req_ctx->current_req->data,
223
             req_ctx->current_req->data_length);
224
    }
225
    req_ctx->start_ptr = req_ctx->buffer + req_ctx->current_req->data_length;
226
    req_ctx->bytes = 0;
227
  }
228

    
229
  return S3StatusOK;
230
}
231

    
232
/* request completion callback */
233
static void
234
libs3_response_complete_callback(S3Status status, const S3ErrorDetails *error,
235
                                 void *context)
236
{
237
  struct libs3_callback_context *req_ctx;
238
  req_ctx = (struct libs3_callback_context *) context;
239

    
240
  req_ctx->status = status;
241
  if (status != S3StatusOK) {
242
    if (error) {
243
      if (error->message) {
244
        fprintf(stderr, "libs3_delegate_helper: Error performing request" \
245
                "-> %s\n", error->message);
246
      } else {
247
        fprintf(stderr, "libs3_delegate_helper: Unknown error performing " \
248
                "request\n");
249
      }
250
    }
251
  }
252
}
253

    
254
/* put request callback */
255
static int
256
libs3_put_object_data_callback(int bufferSize, char *buffer,
257
                               void *context)
258
{
259
  struct libs3_callback_context *req_ctx;
260
  int towrite;
261
  req_ctx = (struct libs3_callback_context *) context;
262

    
263
  towrite = req_ctx->current_req->data_length - req_ctx->bytes;
264

    
265
  req_ctx->status = S3StatusOK;
266

    
267
  if (towrite == 0)
268
    return 0;
269

    
270
  towrite = (towrite > bufferSize)? bufferSize : towrite;
271

    
272
  memcpy(buffer, req_ctx->start_ptr, towrite);
273
  req_ctx->bytes += towrite;
274
  return towrite;
275
}
276

    
277
/* get request callback */
278
static S3Status
279
libs3_get_object_data_callback(int bufferSize, const char *buffer,
280
                               void *context)
281
{
282
  struct libs3_callback_context *req_ctx;
283
  req_ctx = (struct libs3_callback_context *) context;
284

    
285
  /* The buffer should have been prepared by the properties callback.
286
     If not, it means that s3 didn't report the content length */
287
  if (!req_ctx->buffer) {
288
    req_ctx->buffer = malloc(bufferSize + req_ctx->current_req->data_length);
289
    if (!req_ctx->buffer) return S3StatusAbortedByCallback;
290

    
291
    if (req_ctx->current_req->data_length > 0) {
292
      memcpy(req_ctx->buffer,
293
             req_ctx->current_req->data,
294
             req_ctx->current_req->data_length);
295
    }
296
    req_ctx->start_ptr = req_ctx->buffer + req_ctx->current_req->data_length;
297
    req_ctx->bytes = 0;
298
    req_ctx->buffer_size = bufferSize;
299
  }
300

    
301
  /* If s3 didn't report the content length we make room for it on
302
     the fly */
303
  if (req_ctx->bytes + bufferSize > req_ctx->buffer_size) {
304
    int new_size;
305
    uint8_t *old;
306

    
307
    new_size = req_ctx->buffer_size + (bufferSize * 2);
308

    
309
    old = req_ctx->buffer;
310
    req_ctx->buffer = realloc(req_ctx->buffer, new_size);
311
    if (!req_ctx->buffer) {
312
      free(old);
313
      return S3StatusAbortedByCallback;
314
    }
315

    
316
    req_ctx->start_ptr = (req_ctx->buffer +
317
                          req_ctx->bytes +
318
                          req_ctx->current_req->data_length);
319
  }
320

    
321
  memcpy(req_ctx->start_ptr, buffer, bufferSize);
322
  req_ctx->bytes += bufferSize;
323

    
324
  return S3StatusOK;
325
}
326

    
327
/************************************************************************
328
 * request_handler thread implementation
329
 ************************************************************************/
330

    
331
int should_retry(int *counter, int times)
332
{
333
  if (times > 0){
334
    *counter = times;
335
  } else {
336
    sleep(1); /* give some time to the network */
337
    (*counter)--;
338
  }
339
  return *counter > 0;
340
}
341

    
342

    
343
static int process_put_request(void *req_data, void **rsp_data)
344
{
345
  libs3_request_t *req;
346
  struct libs3_callback_context *cbk_ctx;
347
  int retries_left;
348
  int status;
349

    
350
  req = (libs3_request_t *) req_data;
351

    
352
  should_retry(&retries_left, 3);
353

    
354
  /* put operation never have response */
355
  *rsp_data = NULL;
356

    
357
  cbk_ctx = malloc(sizeof(struct libs3_callback_context));
358
  if (!cbk_ctx) return -1;
359

    
360
  cbk_ctx->current_req = req;
361
  cbk_ctx->status = S3StatusInternalError;
362
  cbk_ctx->start_ptr = req->data;
363
  cbk_ctx->bytes = 0;
364
  cbk_ctx->buffer = NULL;
365
  cbk_ctx->buffer_size = 0;
366

    
367
  do {
368
    S3_put_object(&req->ctx->s3_bucket_context, /* bucket info */
369
                  req->key,                /* key to insert */
370
                  req->data_length,        /* length of data  */
371
                  NULL,                     /* use standard properties */
372
                  NULL,                    /* do a blocking call... */
373
                  &libs3_put_object_handler,/* ...using these callback...*/
374
                  cbk_ctx);                /* ...with this data for context */
375

    
376
    /* if we get an error related to a temporary network state retry */
377
  } while(S3_status_is_retryable(cbk_ctx->status) &&
378
          should_retry(&retries_left, 0));
379

    
380
  status = (cbk_ctx->status == S3StatusOK) ? 0 : -1;
381

    
382
  free(cbk_ctx);
383

    
384
  return status;
385
}
386

    
387
static int process_get_request(void *req_data, void **rsp_data)
388
{
389
  libs3_request_t *req;
390
  struct libs3_callback_context *cbk_ctx;
391
  struct libs3_get_response *rsp;
392
  int retries_left;
393
  int status;
394

    
395
  req = (libs3_request_t *) req_data;
396

    
397
  should_retry(&retries_left, 3);
398

    
399
  /* Initilize s3 callback data */
400
  cbk_ctx = malloc(sizeof(struct libs3_callback_context));
401
  if (!cbk_ctx) return -1;
402
  cbk_ctx->current_req = req;
403
  cbk_ctx->status = S3StatusInternalError;
404
  cbk_ctx->start_ptr = NULL;
405
  cbk_ctx->bytes = 0;
406
  cbk_ctx->buffer = NULL;
407
  cbk_ctx->buffer_size = 0;
408

    
409

    
410
  do {
411
    S3_get_object(&req->ctx->s3_bucket_context, /* bucket info */
412
                  req->key,                /* key to retrieve */
413
                  NULL,                    /* do not use conditions */
414
                  0,                       /* start from byte 0... */
415
                  0,                       /* ...and read all bytes */
416
                  NULL,                    /* do a blocking call... */
417
                  &libs3_get_object_handler,/* ...using these callback...*/
418
                  cbk_ctx);                /* ...with this data for context */
419

    
420
    /* if we get an error related to a temporary network state retry */
421
  } while(S3_status_is_retryable(cbk_ctx->status) &&
422
          should_retry(&retries_left, 0));
423

    
424

    
425
  rsp = malloc(sizeof(struct libs3_get_response));
426
  if (!rsp) {
427
    if (cbk_ctx->buffer) free(cbk_ctx->buffer);
428
    free(cbk_ctx);
429
    return -1;
430
  }
431

    
432
  *rsp_data = rsp;
433
  rsp->status = cbk_ctx->status;
434
  if (cbk_ctx->status == S3StatusOK) {
435
    rsp->data = cbk_ctx->buffer;
436
    rsp->current_byte = rsp->data;
437
    rsp->data_length = cbk_ctx->bytes + req->data_length;
438
    rsp->read_bytes = 0;
439
    rsp->last_timestamp = cbk_ctx->last_timestamp;
440
  } else {
441
    /* Since there was no value for the specified key. If the caller specified a
442
       default value, use that */
443
    if (req->default_value) {
444
      rsp->data = malloc(req->data_length + req->default_value_length);
445
      if (!rsp->data) return 1;
446
      rsp->current_byte = rsp->data;
447
      if (req->data_length > 0)
448
        memcpy(rsp->data, req->data, req->data_length);
449

    
450
      memcpy(rsp->data, req->default_value, req->default_value_length);
451

    
452
      rsp->status = S3StatusOK;
453
    }
454
  }
455

    
456
  free(cbk_ctx);
457
  status = (rsp->status == S3StatusOK) ? 0 : -1;
458

    
459

    
460
  return status;
461
}
462

    
463

    
464
/************************************************************************
465
 * cloud helper implementation
466
 ************************************************************************/
467
static void deallocate_context(struct libs3_cloud_context *ctx)
468
{
469
  if (!ctx) return;
470

    
471
  free(ctx);
472
  return;
473
}
474

    
475

    
476
void* cloud_helper_init(struct nodeID *local, const char *config)
477
{
478
  struct libs3_cloud_context *ctx;
479
  struct tag *cfg_tags;
480
  const char *arg;
481

    
482
  ctx = malloc(sizeof(struct libs3_cloud_context));
483
  memset(ctx, 0, sizeof(struct libs3_cloud_context));
484
  cfg_tags = grapes_config_parse(config);
485

    
486
  /* Parse fundametal parameters */
487
  arg = grapes_config_value_str(cfg_tags, "s3_access_key");
488
  if (!arg) {
489
    deallocate_context(ctx);
490
    fprintf(stderr,
491
            "libs3_delegate_helper: missing required parameter " \
492
            "'s3_access_key'\n");
493
    return 0;
494
  }
495
  ctx->s3_bucket_context.accessKeyId = strdup(arg);
496

    
497
  arg = grapes_config_value_str(cfg_tags, "s3_secret_key");
498
  if (!arg) {
499
    deallocate_context(ctx);
500
    fprintf(stderr,
501
            "libs3_delegate_helper: missing required parameter " \
502
            "'s3_secret_key'\n");
503
    return 0;
504
  }
505
  ctx->s3_bucket_context.secretAccessKey = strdup(arg);
506

    
507
  arg = grapes_config_value_str(cfg_tags, "s3_bucket_name");
508
  if (!arg) {
509
    deallocate_context(ctx);
510
    fprintf(stderr,
511
            "libs3_delegate_helper: missing required parameter " \
512
            "'s3_bucket_name'\n");
513
    return 0;
514
  }
515
  ctx->s3_bucket_context.bucketName = strdup(arg);
516

    
517
  ctx->s3_bucket_context.protocol = S3ProtocolHTTPS;
518
  arg = grapes_config_value_str(cfg_tags, "s3_protocol");
519
  if (arg) {
520
    if (strcmp(arg, "https") == 0) {
521
      ctx->s3_bucket_context.protocol = S3ProtocolHTTPS;
522
    } else if (strcmp(arg, "http") == 0) {
523
      ctx->s3_bucket_context.protocol = S3ProtocolHTTP;
524
    }
525
  }
526

    
527
  ctx->s3_bucket_context.uriStyle = S3UriStylePath;
528

    
529

    
530
  /* Parse optional parameters */
531
  ctx->blocking_put_request = 1;
532
  arg = grapes_config_value_str(cfg_tags, "s3_blocking_put");
533
  if (arg) {
534
    if (strcmp(arg, "1") == 0)
535
      ctx->blocking_put_request = 1;
536
    else if (strcmp(arg, "0") == 0)
537
      ctx->blocking_put_request = 0;
538
  }
539

    
540
  /* Initialize data structures */
541
  if (S3_initialize("libs3_delegate_helper", S3_INIT_ALL) != S3StatusOK) {
542
    fprintf(stderr,
543
            "libs3_delegate_helper: error inizializing libs3\n");
544
    deallocate_context(ctx);
545
    return NULL;
546
  }
547

    
548
  ctx->req_handler = req_handler_init();
549
  if (!ctx->req_handler) {
550
    fprintf(stderr,
551
            "libs3_delegate_helper: error initializing request handler\n");
552
    deallocate_context(ctx);
553
    return NULL;
554
  }
555

    
556
  return ctx;
557
}
558

    
559
int get_from_cloud_default(void *context, const char *key, uint8_t *header_ptr,
560
                           int header_size, int free_header, uint8_t *defval_ptr,
561
                           int defval_size, int free_defval)
562
{
563
  struct libs3_cloud_context *ctx;
564
  libs3_request_t *request;
565

    
566
  ctx = (struct libs3_cloud_context *) context;
567
  request = malloc(sizeof(libs3_request_t));
568

    
569
  if (!request) return 1;
570

    
571
  request->op = GET;
572
  request->key = strdup(key);
573
  request->data = header_ptr;
574
  request->data_length = header_size;
575
  request->free_data = free_header;
576
  request->default_value = defval_ptr;
577
  request->default_value_length = defval_size;
578
  request->free_default_value = free_defval;
579
  request->ctx = ctx;
580

    
581
  req_handler_add_request(ctx->req_handler,
582
                          &process_get_request,
583
                          request,
584
                          &free_request);
585

    
586
  return 0;
587
}
588

    
589
int get_from_cloud(void *context, const char *key, uint8_t *header_ptr,
590
                   int header_size, int free_header)
591
{
592
  return get_from_cloud_default(context, key, header_ptr, header_size,
593
                                free_header, NULL, 0, 0);
594
}
595

    
596
int put_on_cloud(void *context, const char *key, uint8_t *buffer_ptr,
597
                 int buffer_size, int free_buffer)
598
{
599
  struct libs3_cloud_context *ctx;
600
  libs3_request_t *request;
601

    
602
  ctx = (struct libs3_cloud_context *) context;
603
  request = malloc(sizeof(libs3_request_t));
604

    
605
  if (!request) return 1;
606

    
607
  request->op = PUT;
608
  request->key = strdup(key);
609
  request->data = buffer_ptr;
610
  request->data_length = buffer_size;
611
  request->free_data = free_buffer;
612
  request->default_value = NULL;
613
  request->default_value_length = 0;
614
  request->free_default_value = 0;
615
  request->ctx = ctx;
616

    
617
  if (ctx->blocking_put_request) {
618
    int res;
619
    void *rsp;
620
    res = process_put_request(request, &rsp);
621
    free_request(request);
622
    return res;
623
  }
624
  else {
625
    return req_handler_add_request(ctx->req_handler, &process_put_request,
626
                                   request, &free_request);
627
  }
628
}
629

    
630
struct nodeID* get_cloud_node(void *context, uint8_t variant)
631
{
632
  return create_node(CLOUD_NODE_ADDR, variant);
633
}
634

    
635
time_t timestamp_cloud(void *context)
636
{
637
  struct libs3_cloud_context *ctx;
638
  ctx = (struct libs3_cloud_context *) context;
639

    
640
  return ctx->last_rsp_timestamp;
641
}
642

    
643
int is_cloud_node(void *context, struct nodeID* node)
644
{
645
  char buff[96];
646
  node_ip(node, buff, 96);
647
  return strcmp(buff, CLOUD_NODE_ADDR) == 0;
648
}
649

    
650
int wait4cloud(void *context, struct timeval *tout)
651
{
652
  struct libs3_cloud_context *ctx;
653
  libs3_get_response_t *rsp;
654

    
655
  ctx = (struct libs3_cloud_context *) context;
656

    
657
  rsp = (libs3_get_response_t*)req_handler_wait4response(ctx->req_handler, tout);
658

    
659
  if (rsp) {
660
    if (rsp->status == S3StatusOK) {
661
      ctx->last_rsp_timestamp = rsp->last_timestamp;
662
      return 1;
663
    } else {
664
      /* there was some error with the request */
665
      req_handler_remove_response(ctx->req_handler);
666
      free_response(rsp);
667
      return -1;
668
    }
669
  } else {
670
    return 0;
671
  }
672
}
673

    
674
int recv_from_cloud(void *context, uint8_t *buffer_ptr, int buffer_size)
675
{
676
  struct libs3_cloud_context *ctx;
677
  libs3_get_response_t *rsp;
678
  int remaining;
679
  int toread;
680

    
681
  ctx = (struct libs3_cloud_context *) context;
682

    
683
  rsp = (libs3_get_response_t *) req_handler_get_response(ctx->req_handler);
684
  if (!rsp) return -1;
685

    
686
  /* If do not have further data just remove the request */
687
  if (rsp->read_bytes == rsp->data_length) {
688
    req_handler_remove_response(ctx->req_handler);
689
    free_response(rsp);
690
    return 0;
691
  }
692

    
693
  remaining = rsp->data_length - rsp->read_bytes;
694
  toread = (remaining <= buffer_size)? remaining : buffer_size;
695

    
696
  memcpy(buffer_ptr, rsp->current_byte, toread);
697
  rsp->current_byte += toread;
698
  rsp->read_bytes += toread;
699

    
700
  /* remove the response only if the read bytes are less the the allocated
701
     buuffer otherwise the client can't know when a single response finished */
702
  if (rsp->read_bytes == rsp->data_length && rsp->read_bytes < buffer_size){
703
    req_handler_remove_response(ctx->req_handler);
704
    free_response(rsp);
705
  }
706

    
707
  return toread;
708
}
709

    
710
struct delegate_iface delegate_impl = {
711
  .cloud_helper_init = &cloud_helper_init,
712
  .get_from_cloud = &get_from_cloud,
713
  .get_from_cloud_default = &get_from_cloud_default,
714
  .put_on_cloud = &put_on_cloud,
715
  .get_cloud_node = &get_cloud_node,
716
  .timestamp_cloud = &timestamp_cloud,
717
  .is_cloud_node = &is_cloud_node,
718
  .wait4cloud = &wait4cloud,
719
  .recv_from_cloud = &recv_from_cloud
720
};