Statistics
| Branch: | Revision:

grapes / src / CloudSupport / libs3_delegate_helper.c @ 4626022b

History | View | Annotate | Download (18.4 KB)

1
/*
2
 *  Copyright (c) 2011 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 *
6
 *
7
 *  Delegate cloud_handler for AmazonS3 based on the libs3 library.
8
 *  Supported parameters (*required):
9
 *
10
 *  - s3_access_key*:  the Amazon access key id
11
 *  - s3_secret_key*:  the Amazon secret access key
12
 *  - s3_bucket_name*: the bucket on which operate
13
 *  - s3_protocol:     http (default) or https
14
 *  - s3_blocking_put: a value of 1 enable blocking operation.
15
 *                     (default: disabled)
16
 */
17
#include <stdlib.h>
18
#include <stdio.h>
19
#include <unistd.h>
20
#include <string.h>
21
#include <sys/time.h>
22

    
23
#include <libs3.h>
24

    
25
#include "net_helper.h"
26
#include "cloud_helper_iface.h"
27
#include "request_handler.h"
28
#include "config.h"
29

    
30
#define CLOUD_NODE_ADDR "0.0.0.0"
31

    
32
/***********************************************************************
33
 * Interface prototype for cloud_helper_delegate
34
 ***********************************************************************/
35
struct delegate_iface {
36
  void* (*cloud_helper_init)(struct nodeID *local, const char *config);
37
  int (*get_from_cloud)(void *context, const char *key, uint8_t *header_ptr,
38
                        int header_size, int free_header);
39
  int (*put_on_cloud)(void *context, const char *key, uint8_t *buffer_ptr,
40
                      int buffer_size, int free_buffer);
41
  struct nodeID* (*get_cloud_node)(void *context, uint8_t variant);
42
  time_t (*timestamp_cloud)(void *context);
43
  int (*is_cloud_node)(void *context, struct nodeID* node);
44
  int (*wait4cloud)(void *context, struct timeval *tout);
45
  int (*recv_from_cloud)(void *context, uint8_t *buffer_ptr, int buffer_size);
46
};
47

    
48
/***********************************************************************
49
 * libs3 data structures
50
 ***********************************************************************/
51
/* response properties handler */
52
static S3Status
53
libs3_response_properties_callback
54
(const S3ResponseProperties *properties,
55
 void *callbackData);
56

    
57
/* request completion callback */
58
static void
59
libs3_response_complete_callback
60
(S3Status status,
61
 const S3ErrorDetails *error,
62
 void *callbackData);
63

    
64
/* put request callback */
65
static int
66
libs3_put_object_data_callback
67
(int bufferSize,
68
 char *buffer,
69
 void *callbackData);
70

    
71
/* get request callback */
72
static S3Status
73
libs3_get_object_data_callback
74
(int bufferSize,
75
 const char *buffer,
76
 void *callbackData);
77

    
78

    
79
static S3PutObjectHandler libs3_put_object_handler =
80
  {
81
    {
82
      &libs3_response_properties_callback,
83
      &libs3_response_complete_callback
84
    },
85
    &libs3_put_object_data_callback
86
  };
87

    
88
static S3GetObjectHandler libs3_get_object_handler =
89
  {
90
    {
91
      &libs3_response_properties_callback,
92
      &libs3_response_complete_callback
93
    },
94
    &libs3_get_object_data_callback
95
  };
96

    
97

    
98
/***********************************************************************
99
 * libs3_delegate_helper contexts
100
 ***********************************************************************/
101

    
102
/* libs3 cloud context definition */
103
struct libs3_cloud_context {
104
  struct req_handler_ctx *req_handler;
105
  S3BucketContext s3_bucket_context;
106
  int blocking_put_request;
107
  time_t last_rsp_timestamp;
108
};
109

    
110
/***********************************************************************
111
 * Requests/Response pool data structures
112
 ***********************************************************************/
113
enum operation_t {PUT=0, GET=1};
114
struct libs3_request {
115
  enum operation_t op;
116
  char *key;
117

    
118
  /* For GET operations this point to the header.
119
     For PUT this is the pointer to the actual data */
120
  uint8_t *data;
121
  int data_length;
122
  int free_data;
123

    
124
  struct libs3_cloud_context *ctx;
125
};
126
typedef struct libs3_request libs3_request_t;
127

    
128
struct libs3_get_response {
129
  S3Status status;
130
  uint8_t *data;
131
  uint8_t *current_byte;
132

    
133
  int data_length;
134
  int read_bytes;
135
  time_t last_timestamp;
136
};
137
typedef struct libs3_get_response libs3_get_response_t;
138

    
139
struct libs3_callback_context {
140
  libs3_request_t *current_req;
141

    
142
  /* point to the current byte to read/write */
143
  uint8_t *start_ptr;
144

    
145
  /* number of bytes read/written until now */
146
  int bytes;
147

    
148
  /* store for get request */
149
  uint8_t *buffer;
150
  size_t buffer_size;
151

    
152
  time_t last_timestamp;
153
  S3Status status;
154
};
155

    
156

    
157
static void free_request(void *req_ptr)
158
{
159
  libs3_request_t *req;
160
  if (!req_ptr) return;
161

    
162
  req = (libs3_request_t *) req_ptr;
163

    
164
  free(req->key);
165
  if (req->free_data > 0) free(req->data);
166

    
167
  free(req);
168
}
169

    
170
static void free_response(libs3_get_response_t *rsp) {
171
  if (rsp->data) free(rsp->data);
172

    
173
  free(rsp);
174
}
175

    
176
/************************************************************************
177
 * libs3 callback implementation
178
 ************************************************************************/
179
static S3Status
180
libs3_response_properties_callback(const S3ResponseProperties *properties,
181
                                   void *context)
182
{
183
  struct libs3_callback_context *req_ctx;
184
  req_ctx = (struct libs3_callback_context *) context;
185

    
186
  if (properties->lastModified > 0) {
187
    req_ctx->last_timestamp = (time_t) properties->lastModified;
188
  } else {
189
    req_ctx->last_timestamp = 0;
190
  }
191

    
192
  if (properties->contentLength && req_ctx->current_req->op == GET) {
193
    uint64_t actual_length;
194
    size_t supported_length;
195

    
196
    actual_length = (properties->contentLength +
197
                     req_ctx->current_req->data_length);
198
    supported_length = (size_t) actual_length;
199

    
200
    /* This is probably useless as if actual_length is so big
201
       there is no way we can keep it all in memory */
202
    if (supported_length < actual_length)
203
      return S3StatusAbortedByCallback;
204

    
205
    req_ctx->buffer = malloc(actual_length);
206
    if (!req_ctx->buffer)
207
      return S3StatusAbortedByCallback;
208
    req_ctx->buffer_size = actual_length;
209

    
210
    if (req_ctx->current_req->data_length > 0) {
211
      memcpy(req_ctx->buffer,
212
             req_ctx->current_req->data,
213
             req_ctx->current_req->data_length);
214
    }
215
    req_ctx->start_ptr = req_ctx->buffer + req_ctx->current_req->data_length;
216
    req_ctx->bytes = 0;
217
  }
218

    
219
  return S3StatusOK;
220
}
221

    
222
/* request completion callback */
223
static void
224
libs3_response_complete_callback(S3Status status, const S3ErrorDetails *error,
225
                                 void *context)
226
{
227
  struct libs3_callback_context *req_ctx;
228
  req_ctx = (struct libs3_callback_context *) context;
229

    
230
  fprintf(stderr, "COMPLETE: %d\n", req_ctx->status);
231
  req_ctx->status = status;
232
  fprintf(stderr, "COMPLETE: %d\n", req_ctx->status);
233
  if (status != S3StatusOK) {
234
    if (error) {
235
      if (error->message) {
236
        fprintf(stderr, "libs3_delegate_helper: Error performing request" \
237
                "-> %s\n", error->message);
238
      } else {
239
        fprintf(stderr, "libs3_delegate_helper: Unknown error performing " \
240
                "request\n");
241
      }
242
    }
243
  }
244
}
245

    
246
/* put request callback */
247
static int
248
libs3_put_object_data_callback(int bufferSize, char *buffer,
249
                               void *context)
250
{
251
  struct libs3_callback_context *req_ctx;
252
  int towrite;
253
  fprintf(stderr, "FUCK I'M HERE AND I'M OK >> buffer=%d, status=%d, buffer=%p\n", bufferSize, req_ctx->status, buffer);
254
  req_ctx = (struct libs3_callback_context *) context;
255

    
256
  towrite = req_ctx->current_req->data_length - req_ctx->bytes;
257

    
258
  req_ctx->status = S3StatusOK;
259

    
260
  if (towrite == 0)
261
    return 0;
262

    
263
  towrite = (towrite > bufferSize)? bufferSize : towrite;
264

    
265
  memcpy(buffer, req_ctx->start_ptr, towrite);
266
  req_ctx->bytes += towrite;
267
  fprintf(stderr, "FUCK I'M HERE AND I'M OK >> towrite=%d, buffer=%d, status=%d\n", towrite, bufferSize, req_ctx->status);
268
  return towrite;
269
}
270

    
271
/* get request callback */
272
static S3Status
273
libs3_get_object_data_callback(int bufferSize, const char *buffer,
274
                               void *context)
275
{
276
  struct libs3_callback_context *req_ctx;
277
  req_ctx = (struct libs3_callback_context *) context;
278

    
279
  /* The buffer should have been prepared by the properties callback.
280
     If not, it means that s3 didn't report the content length */
281
  if (!req_ctx->buffer) {
282
    req_ctx->buffer = malloc(bufferSize + req_ctx->current_req->data_length);
283
    if (!req_ctx->buffer) return S3StatusAbortedByCallback;
284

    
285
    if (req_ctx->current_req->data_length > 0) {
286
      memcpy(req_ctx->buffer,
287
             req_ctx->current_req->data,
288
             req_ctx->current_req->data_length);
289
    }
290
    req_ctx->start_ptr = req_ctx->buffer + req_ctx->current_req->data_length;
291
    req_ctx->bytes = 0;
292
    req_ctx->buffer_size = bufferSize;
293
  }
294

    
295
  /* If s3 didn't report the content length we make room for it on
296
     the fly */
297
  if (req_ctx->bytes + bufferSize > req_ctx->buffer_size) {
298
    int new_size;
299
    uint8_t *old;
300

    
301
    new_size = req_ctx->buffer_size + (bufferSize * 2);
302

    
303
    old = req_ctx->buffer;
304
    req_ctx->buffer = realloc(req_ctx->buffer, new_size);
305
    if (!req_ctx->buffer) {
306
      free(old);
307
      return S3StatusAbortedByCallback;
308
    }
309

    
310
    req_ctx->start_ptr = (req_ctx->buffer +
311
                          req_ctx->bytes +
312
                          req_ctx->current_req->data_length);
313
  }
314

    
315
  memcpy(req_ctx->start_ptr, buffer, bufferSize);
316
  req_ctx->bytes += bufferSize;
317

    
318
  return S3StatusOK;
319
}
320

    
321
/************************************************************************
322
 * request_handler thread implementation
323
 ************************************************************************/
324

    
325
int should_retry(int *counter, int times)
326
{
327
  if (times > 0){
328
    *counter = times;
329
  } else {
330
    sleep(1); /* give some time to the network */
331
    (*counter)--;
332
  }
333
  return *counter > 0;
334
}
335

    
336

    
337
static int process_put_request(void *req_data, void **rsp_data)
338
{
339
  libs3_request_t *req;
340
  struct libs3_callback_context *cbk_ctx;
341
  int retries_left;
342
  int status;
343

    
344
  req = (libs3_request_t *) req_data;
345

    
346
  should_retry(&retries_left, 3);
347

    
348
  /* put operation never have response */
349
  *rsp_data = NULL;
350

    
351
  cbk_ctx = malloc(sizeof(struct libs3_callback_context));
352
  if (!cbk_ctx) return -1;
353

    
354
  cbk_ctx->current_req = req;
355
  cbk_ctx->status = S3StatusInternalError;
356
  cbk_ctx->start_ptr = req->data;
357
  cbk_ctx->bytes = 0;
358
  cbk_ctx->buffer = NULL;
359
  cbk_ctx->buffer_size = 0;
360

    
361
  do {
362
    S3_put_object(&req->ctx->s3_bucket_context, /* bucket info */
363
                  req->key,                /* key to insert */
364
                  req->data_length,        /* length of data  */
365
                  NULL,                     /* use standard properties */
366
                  NULL,                    /* do a blocking call... */
367
                  &libs3_put_object_handler,/* ...using these callback...*/
368
                  cbk_ctx);                /* ...with this data for context */
369

    
370
    fprintf(stderr, "prima del while: %d\n", cbk_ctx->status);
371
    /* if we get an error related to a temporary network state retry */
372
  } while(S3_status_is_retryable(cbk_ctx->status) &&
373
          should_retry(&retries_left, 0));
374

    
375
  status = (cbk_ctx->status == S3StatusOK) ? 0 : -1;
376

    
377
  free(cbk_ctx);
378

    
379
  return status;
380
}
381

    
382
static int process_get_request(void *req_data, void **rsp_data)
383
{
384
  libs3_request_t *req;
385
  struct libs3_callback_context *cbk_ctx;
386
  struct libs3_get_response *rsp;
387
  int retries_left;
388
  int status;
389

    
390
  req = (libs3_request_t *) req_data;
391

    
392
  should_retry(&retries_left, 3);
393

    
394
  /* Initilize s3 callback data */
395
  cbk_ctx = malloc(sizeof(struct libs3_callback_context));
396
  if (!cbk_ctx) return -1;
397
  cbk_ctx->current_req = req;
398
  cbk_ctx->status = S3StatusInternalError;
399
  cbk_ctx->start_ptr = NULL;
400
  cbk_ctx->bytes = 0;
401
  cbk_ctx->buffer = NULL;
402
  cbk_ctx->buffer_size = 0;
403

    
404

    
405
  do {
406
    S3_get_object(&req->ctx->s3_bucket_context, /* bucket info */
407
                  req->key,                /* key to retrieve */
408
                  NULL,                    /* do not use conditions */
409
                  0,                       /* start from byte 0... */
410
                  0,                       /* ...and read all bytes */
411
                  NULL,                    /* do a blocking call... */
412
                  &libs3_get_object_handler,/* ...using these callback...*/
413
                  cbk_ctx);                /* ...with this data for context */
414

    
415
    /* if we get an error related to a temporary network state retry */
416
  } while(S3_status_is_retryable(cbk_ctx->status) &&
417
          should_retry(&retries_left, 0));
418

    
419

    
420
  rsp = malloc(sizeof(struct libs3_get_response));
421
  if (!rsp) {
422
    if (cbk_ctx->buffer) free(cbk_ctx->buffer);
423
    free(cbk_ctx);
424
    return -1;
425
  }
426

    
427
  *rsp_data = rsp;
428
  rsp->status = cbk_ctx->status;
429
  if (cbk_ctx->status == S3StatusOK) {
430
    rsp->data = cbk_ctx->buffer;
431
    rsp->current_byte = rsp->data;
432
    rsp->data_length = cbk_ctx->bytes + req->data_length;
433
    rsp->read_bytes = 0;
434
    rsp->last_timestamp = cbk_ctx->last_timestamp;
435
  }
436

    
437
  status = (cbk_ctx->status == S3StatusOK) ? 0 : -1;
438
  free(cbk_ctx);
439

    
440
  return status;
441
}
442

    
443

    
444
/************************************************************************
445
 * cloud helper implementation
446
 ************************************************************************/
447
static void deallocate_context(struct libs3_cloud_context *ctx)
448
{
449
  if (!ctx) return;
450

    
451
  free(ctx);
452
  return;
453
}
454

    
455

    
456
void* cloud_helper_init(struct nodeID *local, const char *config)
457
{
458
  struct libs3_cloud_context *ctx;
459
  struct tag *cfg_tags;
460
  const char *arg;
461

    
462
  ctx = malloc(sizeof(struct libs3_cloud_context));
463
  memset(ctx, 0, sizeof(struct libs3_cloud_context));
464
  cfg_tags = config_parse(config);
465

    
466
  /* Parse fundametal parameters */
467
  arg = config_value_str(cfg_tags, "s3_access_key");
468
  if (!arg) {
469
    deallocate_context(ctx);
470
    fprintf(stderr,
471
            "libs3_delegate_helper: missing required parameter " \
472
            "'s3_access_key'\n");
473
    return 0;
474
  }
475
  ctx->s3_bucket_context.accessKeyId = strdup(arg);
476

    
477
  arg = config_value_str(cfg_tags, "s3_secret_key");
478
  if (!arg) {
479
    deallocate_context(ctx);
480
    fprintf(stderr,
481
            "libs3_delegate_helper: missing required parameter " \
482
            "'s3_secret_key'\n");
483
    return 0;
484
  }
485
  ctx->s3_bucket_context.secretAccessKey = strdup(arg);
486

    
487
  arg = config_value_str(cfg_tags, "s3_bucket_name");
488
  if (!arg) {
489
    deallocate_context(ctx);
490
    fprintf(stderr,
491
            "libs3_delegate_helper: missing required parameter " \
492
            "'s3_bucket_name'\n");
493
    return 0;
494
  }
495
  ctx->s3_bucket_context.bucketName = strdup(arg);
496

    
497
  ctx->s3_bucket_context.protocol = S3ProtocolHTTPS;
498
  arg = config_value_str(cfg_tags, "s3_protocol");
499
  if (arg) {
500
    if (strcmp(arg, "https") == 0) {
501
      ctx->s3_bucket_context.protocol = S3ProtocolHTTPS;
502
    } else if (strcmp(arg, "http") == 0) {
503
      ctx->s3_bucket_context.protocol = S3ProtocolHTTP;
504
    }
505
  }
506

    
507
  ctx->s3_bucket_context.uriStyle = S3UriStylePath;
508

    
509

    
510
  /* Parse optional parameters */
511
  ctx->blocking_put_request = 1;
512
  arg = config_value_str(cfg_tags, "s3_blocking_put");
513
  if (arg) {
514
    if (strcmp(arg, "1") == 0)
515
      ctx->blocking_put_request = 1;
516
    else if (strcmp(arg, "0") == 0)
517
      ctx->blocking_put_request = 0;
518
  }
519

    
520
  /* Initialize data structures */
521
  if (S3_initialize("libs3_delegate_helper", S3_INIT_ALL) != S3StatusOK) {
522
    fprintf(stderr,
523
            "libs3_delegate_helper: error inizializing libs3\n");
524
    deallocate_context(ctx);
525
    return NULL;
526
  }
527

    
528
  ctx->req_handler = req_handler_init();
529
  if (!ctx->req_handler) {
530
    fprintf(stderr,
531
            "libs3_delegate_helper: error initializing request handler\n");
532
    deallocate_context(ctx);
533
    return NULL;
534
  }
535

    
536
  return ctx;
537
}
538

    
539
int get_from_cloud(void *context, const char *key, uint8_t *header_ptr,
540
                   int header_size, int free_header)
541
{
542
  struct libs3_cloud_context *ctx;
543
  libs3_request_t *request;
544

    
545
  ctx = (struct libs3_cloud_context *) context;
546
  request = malloc(sizeof(libs3_request_t));
547

    
548
  if (!request) return 1;
549

    
550
  request->op = GET;
551
  request->key = strdup(key);
552
  request->data = header_ptr;
553
  request->data_length = header_size;
554
  request->free_data = free_header;
555
  request->ctx = ctx;
556

    
557
  req_handler_add_request(ctx->req_handler,
558
                          &process_get_request,
559
                          request,
560
                          &free_request);
561

    
562
  return 0;
563
}
564

    
565
int put_on_cloud(void *context, const char *key, uint8_t *buffer_ptr,
566
                 int buffer_size, int free_buffer)
567
{
568
  struct libs3_cloud_context *ctx;
569
  libs3_request_t *request;
570

    
571
  ctx = (struct libs3_cloud_context *) context;
572
  request = malloc(sizeof(libs3_request_t));
573

    
574
  if (!request) return 1;
575

    
576
  request->op = PUT;
577
  request->key = strdup(key);
578
  request->data = buffer_ptr;
579
  request->data_length = buffer_size;
580
  request->free_data = free_buffer;
581
  request->ctx = ctx;
582

    
583
  if (ctx->blocking_put_request) {
584
    int res;
585
    void *rsp;
586
    res = process_put_request(request, &rsp);
587
    free_request(request);
588
    return res;
589
  }
590
  else {
591
    return req_handler_add_request(ctx->req_handler, &process_put_request,
592
                                   request, &free_request);
593
  }
594
}
595

    
596
struct nodeID* get_cloud_node(void *context, uint8_t variant)
597
{
598
  return create_node(CLOUD_NODE_ADDR, variant);
599
}
600

    
601
time_t timestamp_cloud(void *context)
602
{
603
  struct libs3_cloud_context *ctx;
604
  ctx = (struct libs3_cloud_context *) context;
605

    
606
  return ctx->last_rsp_timestamp;
607
}
608

    
609
int is_cloud_node(void *context, struct nodeID* node)
610
{
611
  return strcmp(node_ip(node), CLOUD_NODE_ADDR) == 0;
612
}
613

    
614
int wait4cloud(void *context, struct timeval *tout)
615
{
616
  struct libs3_cloud_context *ctx;
617
  libs3_get_response_t *rsp;
618

    
619
  ctx = (struct libs3_cloud_context *) context;
620

    
621
  rsp = (libs3_get_response_t*)req_handler_wait4response(ctx->req_handler, tout);
622

    
623
  if (rsp) {
624
    if (rsp->status == S3StatusOK) {
625
      ctx->last_rsp_timestamp = rsp->last_timestamp;
626
      return 1;
627
    } else {
628
      /* there was some error with the request */
629
      req_handler_remove_response(ctx->req_handler);
630
      free_response(rsp);
631
      return -1;
632
    }
633
  } else {
634
    return 0;
635
  }
636
}
637

    
638
int recv_from_cloud(void *context, uint8_t *buffer_ptr, int buffer_size)
639
{
640
  struct libs3_cloud_context *ctx;
641
  libs3_get_response_t *rsp;
642
  int remaining;
643
  int toread;
644

    
645
  ctx = (struct libs3_cloud_context *) context;
646

    
647
  rsp = (libs3_get_response_t *) req_handler_get_response(ctx->req_handler);
648
  if (!rsp) return -1;
649

    
650
  remaining = rsp->data_length - rsp->read_bytes;
651
  toread = (remaining <= buffer_size)? remaining : buffer_size;
652

    
653
  memcpy(buffer_ptr, rsp->current_byte, toread);
654
  rsp->current_byte += toread;
655
  rsp->read_bytes += toread;
656

    
657
  if (rsp->read_bytes == rsp->data_length){
658
    req_handler_remove_response(ctx->req_handler);
659
    free_response(rsp);
660
  }
661

    
662
  return toread;
663
}
664

    
665
struct delegate_iface delegate_impl = {
666
  .cloud_helper_init = &cloud_helper_init,
667
  .get_from_cloud = &get_from_cloud,
668
  .put_on_cloud = &put_on_cloud,
669
  .get_cloud_node = &get_cloud_node,
670
  .timestamp_cloud = &timestamp_cloud,
671
  .is_cloud_node = &is_cloud_node,
672
  .wait4cloud = &wait4cloud,
673
  .recv_from_cloud = &recv_from_cloud
674
};