Statistics
| Branch: | Revision:

grapes / src / CloudSupport / libs3_delegate_helper.c @ b9e7dd7b

History | View | Annotate | Download (23.8 KB)

1
/*
2
 *  Copyright (c) 2011 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 *
6
 *
7
 *  Delegate cloud_handler for AmazonS3 based on the libs3 library.
8
 *  Supported parameters (*required):
9
 *
10
 *  - s3_access_key*:  the Amazon access key id
11
 *  - s3_secret_key*:  the Amazon secret access key
12
 *  - s3_bucket_name*: the bucket on which operate
13
 *  - s3_protocol:     http (default) or https
14
 *  - s3_blocking_put: a value of 1 enable blocking operation.
15
 *                     (default: disabled)
16
 */
17
#include <stdlib.h>
18
#include <stdio.h>
19
#include <unistd.h>
20
#include <pthread.h>
21
#include <semaphore.h>
22
#include <string.h>
23
#include <sys/time.h>
24

    
25
#include <libs3.h>
26

    
27
#include "net_helper.h"
28
#include "cloud_helper_iface.h"
29
#include "fifo_queue.h"
30
#include "config.h"
31

    
32
#define CLOUD_NODE_ADDR "0.0.0.0"
33

    
34
/***********************************************************************
35
 * Interface prototype for cloud_helper_delegate
36
 ***********************************************************************/
37
struct delegate_iface {
38
  void* (*cloud_helper_init)(struct nodeID *local, const char *config);
39
  int (*get_from_cloud)(void *context, const char *key, uint8_t *header_ptr,
40
                        int header_size, int free_header);
41
  int (*put_on_cloud)(void *context, const char *key, uint8_t *buffer_ptr,
42
                      int buffer_size, int free_buffer);
43
  struct nodeID* (*get_cloud_node)(void *context, uint8_t variant);
44
  time_t (*timestamp_cloud)(void *context);
45
  int (*is_cloud_node)(void *context, struct nodeID* node);
46
  int (*wait4cloud)(void *context, struct timeval *tout);
47
  int (*recv_from_cloud)(void *context, uint8_t *buffer_ptr, int buffer_size);
48
};
49

    
50
/***********************************************************************
51
 * Requests/Response pool data structures
52
 ***********************************************************************/
53
enum operation_t {PUT=0, GET=1};
54

    
55
typedef struct libs3_request {
56
  enum operation_t op;
57
  char *key;
58

    
59
  /* For GET operations this point to the header.
60
     For PUT this is the pointer to the actual data */
61
  uint8_t *data;
62
  int data_length;
63
  int free_data;
64
} libs3_request_t;
65

    
66
typedef struct libs3_get_response {
67
  S3Status status;
68
  uint8_t *data;
69
  uint8_t *current_byte;
70

    
71
  int data_length;
72
  int read_bytes;
73
  time_t last_timestamp;
74
} libs3_get_response_t;
75

    
76

    
77
/***********************************************************************
78
 * libs3 data structures
79
 ***********************************************************************/
80
/* response properties handler */
81
static S3Status
82
libs3_response_properties_callback
83
(const S3ResponseProperties *properties,
84
 void *callbackData);
85

    
86
/* request completion callback */
87
static void
88
libs3_response_complete_callback
89
(S3Status status,
90
 const S3ErrorDetails *error,
91
 void *callbackData);
92

    
93
/* put request callback */
94
static int
95
libs3_put_object_data_callback
96
(int bufferSize,
97
 char *buffer,
98
 void *callbackData);
99

    
100
/* get request callback */
101
static S3Status
102
libs3_get_object_data_callback
103
(int bufferSize,
104
 const char *buffer,
105
 void *callbackData);
106

    
107

    
108
static S3PutObjectHandler libs3_put_object_handler =
109
  {
110
    {
111
      &libs3_response_properties_callback,
112
      &libs3_response_complete_callback
113
    },
114
    &libs3_put_object_data_callback
115
  };
116

    
117
static S3GetObjectHandler libs3_get_object_handler =
118
  {
119
    {
120
      &libs3_response_properties_callback,
121
      &libs3_response_complete_callback
122
    },
123
    &libs3_get_object_data_callback
124
  };
125

    
126

    
127
/***********************************************************************
128
 * libs3_delegate_helper contexts
129
 ***********************************************************************/
130

    
131
/* libs3 cloud context definition */
132
struct libs3_cloud_context {
133
  /* libs3 information */
134
  S3BucketContext s3_bucket_context;
135
  int blocking_put_request;
136

    
137
  /* request/response management */
138
  pthread_mutex_t req_queue_lock;
139
  fifo_queue_p req_queue;
140

    
141
  pthread_mutex_t rsp_queue_sync_mutex;
142
  pthread_cond_t rsp_queue_sync_cond;
143
  pthread_mutex_t rsp_queue_lock;
144
  fifo_queue_p rsp_queue;
145

    
146
  time_t last_rsp_timestamp;
147

    
148
  /* thread management */
149
  pthread_attr_t req_handler_thread_attr;
150
  pthread_cond_t req_handler_sync_cond;
151
  pthread_mutex_t req_handler_sync_mutex;
152
  pthread_t req_handler_thread;
153
};
154

    
155
struct libs3_request_context {
156
  struct libs3_cloud_context *cloud_ctx;
157
  libs3_request_t *current_req;
158

    
159
  /* point to the current byte to read/write */
160
  uint8_t *start_ptr;
161

    
162
  /* number of bytes read/written until now */
163
  int bytes;
164

    
165
  /* store for get request */
166
  uint8_t *buffer;
167
  size_t buffer_size;
168

    
169
  time_t last_timestamp;
170
  S3Status status;
171
};
172

    
173
static void free_request(libs3_request_t *req)
174
{
175
  if (!req) return;
176

    
177
  free(req->key);
178
  if (req->free_data > 0) free(req->data);
179

    
180
  free(req);
181
}
182

    
183
static void add_request(struct libs3_cloud_context *ctx, libs3_request_t *req)
184
{
185
  /* add the request to the pool */
186
  pthread_mutex_lock(&ctx->req_queue_lock);
187
  fifo_queue_add(ctx->req_queue, req);
188
  pthread_mutex_unlock(&ctx->req_queue_lock);
189

    
190
  /* notify request handler thread */
191
  pthread_mutex_lock(&ctx->req_handler_sync_mutex);
192
  pthread_cond_signal(&ctx->req_handler_sync_cond);
193
  pthread_mutex_unlock(&ctx->req_handler_sync_mutex);
194
}
195

    
196

    
197
static libs3_get_response_t* get_response(struct libs3_cloud_context *ctx)
198
{
199
  if (fifo_queue_size(ctx->rsp_queue) > 0) {
200
    libs3_get_response_t *rsp;
201

    
202
    pthread_mutex_lock(&ctx->rsp_queue_lock);
203
    rsp = fifo_queue_get_head(ctx->rsp_queue);
204
    pthread_mutex_unlock(&ctx->rsp_queue_lock);
205
    return rsp;
206
  }
207

    
208
  return NULL;
209
}
210

    
211
static libs3_get_response_t* wait4response(struct libs3_cloud_context *ctx,
212
                                           struct timeval *tout)
213
{
214
  if (fifo_queue_size(ctx->rsp_queue) == 0) {
215
    /* if there's no data ready to process, let's wait */
216
    struct timespec timeout;
217
    struct timeval abs_tout;
218
    int err;
219

    
220
    gettimeofday(&abs_tout, NULL);
221
    abs_tout.tv_sec += tout->tv_sec;
222
    abs_tout.tv_usec += tout->tv_usec;
223

    
224
    timeout.tv_sec = abs_tout.tv_sec + (abs_tout.tv_usec / 1000000);
225
    timeout.tv_nsec = abs_tout.tv_usec % 1000000;
226

    
227

    
228
    pthread_mutex_lock(&ctx->rsp_queue_sync_mutex);
229
    /* make sure that no data came in the meanwhile */
230
    if (fifo_queue_size(ctx->rsp_queue) == 0) {
231
      err = pthread_cond_timedwait(&ctx->rsp_queue_sync_cond,
232
                                   &ctx->rsp_queue_sync_mutex,
233
                                   &timeout);
234
    }
235
    pthread_mutex_unlock(&ctx->rsp_queue_sync_mutex);
236
  }
237

    
238
  return get_response(ctx);
239
}
240

    
241
 static void add_response(struct libs3_cloud_context *ctx,
242
                          libs3_get_response_t *rsp)
243
{
244
  /* add the response to the pool */
245
  pthread_mutex_lock(&ctx->rsp_queue_lock);
246
  fifo_queue_add(ctx->rsp_queue, rsp);
247
  pthread_mutex_unlock(&ctx->rsp_queue_lock);
248

    
249
  /* notify wait4response there's a response in the queue */
250
  pthread_mutex_lock(&ctx->rsp_queue_sync_mutex);
251
  pthread_cond_signal(&ctx->rsp_queue_sync_cond);
252

    
253
  fflush(stderr);
254
  pthread_mutex_unlock(&ctx->rsp_queue_sync_mutex);
255
}
256

    
257
static void pop_response(struct libs3_cloud_context *ctx)
258
{
259
  libs3_get_response_t *rsp;
260

    
261
  pthread_mutex_lock(&ctx->rsp_queue_lock);
262
  rsp = fifo_queue_remove_head(ctx->rsp_queue);
263
  pthread_mutex_unlock(&ctx->rsp_queue_lock);
264

    
265
  if (rsp) {
266
    if (rsp->data) free(rsp->data);
267
    free(rsp);
268
  }
269
}
270

    
271
/************************************************************************
272
 * libs3 callback implementation
273
 ************************************************************************/
274
static S3Status
275
libs3_response_properties_callback(const S3ResponseProperties *properties,
276
                                   void *context)
277
{
278
  struct libs3_request_context *req_ctx;
279
  req_ctx = (struct libs3_request_context *) context;
280

    
281
  if (properties->lastModified > 0) {
282
    req_ctx->last_timestamp = (time_t) properties->lastModified;
283
  } else {
284
    req_ctx->last_timestamp = 0;
285
  }
286

    
287
  if (properties->contentLength && req_ctx->current_req->op == GET) {
288
    uint64_t actual_length;
289
    size_t supported_length;
290

    
291
    actual_length = (properties->contentLength +
292
                     req_ctx->current_req->data_length);
293
    supported_length = (size_t) actual_length;
294

    
295
    /* This is probably useless as if actual_length is so big
296
       there is no way we can keep it all in memory */
297
    if (supported_length < actual_length)
298
      return S3StatusAbortedByCallback;
299

    
300
    req_ctx->buffer = malloc(actual_length);
301
    if (!req_ctx->buffer)
302
      return S3StatusAbortedByCallback;
303
    req_ctx->buffer_size = actual_length;
304

    
305
    if (req_ctx->current_req->data_length > 0) {
306
      memcpy(req_ctx->buffer,
307
             req_ctx->current_req->data,
308
             req_ctx->current_req->data_length);
309
    }
310
    req_ctx->start_ptr = req_ctx->buffer + req_ctx->current_req->data_length;
311
    req_ctx->bytes = 0;
312
  }
313

    
314
  return S3StatusOK;
315
}
316

    
317
/* request completion callback */
318
static void
319
libs3_response_complete_callback(S3Status status, const S3ErrorDetails *error,
320
                                 void *context)
321
{
322
  struct libs3_request_context *req_ctx;
323
  req_ctx = (struct libs3_request_context *) context;
324

    
325
  req_ctx->status = status;
326
  if (status != S3StatusOK) {
327
    if (error) {
328
      if (error->message) {
329
        fprintf(stderr, "libs3_delegate_helper: Error performing request" \
330
                "-> %s\n", error->message);
331
      } else {
332
        fprintf(stderr, "libs3_delegate_helper: Unknown error performing " \
333
                "request\n");
334
      }
335
    }
336
  }
337
}
338

    
339
/* put request callback */
340
static int
341
libs3_put_object_data_callback(int bufferSize, char *buffer,
342
                               void *context)
343
{
344
  struct libs3_request_context *req_ctx;
345
  int towrite;
346
  req_ctx = (struct libs3_request_context *) context;
347

    
348
  towrite = req_ctx->current_req->data_length - req_ctx->bytes;
349

    
350
  if (towrite == 0) return 0;
351

    
352
  towrite = (towrite > bufferSize)? bufferSize : towrite;
353

    
354
  memcpy(buffer, req_ctx->start_ptr, towrite);
355
  req_ctx->bytes += towrite;
356

    
357
  return towrite;
358
}
359

    
360
/* get request callback */
361
static S3Status
362
libs3_get_object_data_callback(int bufferSize, const char *buffer,
363
                               void *context)
364
{
365
  struct libs3_request_context *req_ctx;
366
  req_ctx = (struct libs3_request_context *) context;
367

    
368
  /* The buffer should have been prepared by the properties callback.
369
     If not, it means that s3 didn't report the content length */
370
  if (!req_ctx->buffer) {
371
    req_ctx->buffer = malloc(bufferSize + req_ctx->current_req->data_length);
372
    if (!req_ctx->buffer) return S3StatusAbortedByCallback;
373

    
374
    if (req_ctx->current_req->data_length > 0) {
375
      memcpy(req_ctx->buffer,
376
             req_ctx->current_req->data,
377
             req_ctx->current_req->data_length);
378
    }
379
    req_ctx->start_ptr = req_ctx->buffer + req_ctx->current_req->data_length;
380
    req_ctx->bytes = 0;
381
    req_ctx->buffer_size = bufferSize;
382
  }
383

    
384
  /* If s3 didn't report the content length we make room for it on
385
     the fly */
386
  if (req_ctx->bytes + bufferSize > req_ctx->buffer_size) {
387
    int new_size;
388
    uint8_t *old;
389

    
390
    new_size = req_ctx->buffer_size + (bufferSize * 2);
391

    
392
    old = req_ctx->buffer;
393
    req_ctx->buffer = realloc(req_ctx->buffer, new_size);
394
    if (!req_ctx->buffer) {
395
      free(old);
396
      return S3StatusAbortedByCallback;
397
    }
398

    
399
    req_ctx->start_ptr = (req_ctx->buffer +
400
                          req_ctx->bytes +
401
                          req_ctx->current_req->data_length);
402
  }
403

    
404
  memcpy(req_ctx->start_ptr, buffer, bufferSize);
405
  req_ctx->bytes += bufferSize;
406

    
407
  return S3StatusOK;
408
}
409

    
410
/************************************************************************
411
 * request_handler thread implementation
412
 ************************************************************************/
413

    
414
int should_retry(int *counter, int times)
415
{
416
  if (times > 0){
417
    *counter = times;
418
  } else {
419
    sleep(1); /* give some time to the network */
420
    (*counter)--;
421
  }
422
  return *counter > 0;
423
}
424

    
425

    
426
static int
427
request_handler_process_put_request(struct libs3_cloud_context *ctx,
428
                                    libs3_request_t *req)
429
{
430
  struct libs3_request_context *req_ctx;
431
  int retries_left;
432
  should_retry(&retries_left, 3);
433

    
434
  req_ctx = malloc(sizeof(struct libs3_request_context));
435
  if (!req_ctx) return 1;
436

    
437
  req_ctx->current_req = req;
438
  req_ctx->cloud_ctx = ctx;
439
  req_ctx->status = S3StatusInternalError;
440
  req_ctx->start_ptr = req->data;
441
  req_ctx->bytes = 0;
442
  req_ctx->buffer = NULL;
443
  req_ctx->buffer_size = 0;
444

    
445
  do {
446
    S3_put_object(&ctx->s3_bucket_context, /* bucket info */
447
                  req->key,                /* key to insert */
448
                  req->data_length,        /* length of data  */
449
                  NULL,                     /* use standard properties */
450
                  NULL,                    /* do a blocking call... */
451
                  &libs3_put_object_handler,/* ...using these callback...*/
452
                  req_ctx);                /* ...with this data for context */
453

    
454
    /* if we get an error related to a temporary network state retry */
455
  } while(S3_status_is_retryable(req_ctx->status) &&
456
          should_retry(&retries_left, 0));
457

    
458
  return (req_ctx->status == S3StatusOK) ? 0 : 1;
459
}
460

    
461
static int
462
request_handler_process_get_request(struct libs3_cloud_context *ctx,
463
                                    libs3_request_t *req)
464
{
465
  struct libs3_request_context *req_ctx;
466
  struct libs3_get_response *rsp;
467
  int retries_left;
468
  should_retry(&retries_left, 3);
469

    
470
  req_ctx = malloc(sizeof(struct libs3_request_context));
471
  if (!req_ctx) return 1;
472

    
473
  req_ctx->current_req = req;
474
  req_ctx->cloud_ctx = ctx;
475
  req_ctx->status = S3StatusInternalError;
476
  req_ctx->start_ptr = NULL;
477
  req_ctx->bytes = 0;
478
  req_ctx->buffer = NULL;
479
  req_ctx->buffer_size = 0;
480

    
481
  do {
482
    S3_get_object(&ctx->s3_bucket_context, /* bucket info */
483
                  req->key,                /* key to retrieve */
484
                  NULL,                    /* do not use conditions */
485
                  0,                       /* start from byte 0... */
486
                  0,                       /* ...and read all bytes */
487
                  NULL,                    /* do a blocking call... */
488
                  &libs3_get_object_handler,/* ...using these callback...*/
489
                  req_ctx);                /* ...with this data for context */
490

    
491
    /* if we get an error related to a temporary network state retry */
492
  } while(S3_status_is_retryable(req_ctx->status) &&
493
          should_retry(&retries_left, 0));
494

    
495

    
496
  rsp = malloc(sizeof(struct libs3_get_response));
497
  if (!rsp) {
498
    if (req_ctx->buffer) free(req_ctx->buffer);
499
    free(req_ctx);
500
    return 1;
501
  }
502

    
503
  rsp->status = req_ctx->status;
504
  if (req_ctx->status == S3StatusOK) {
505
    rsp->data = req_ctx->buffer;
506
    rsp->current_byte = rsp->data;
507
    rsp->data_length = req_ctx->bytes + req->data_length;
508
    rsp->read_bytes = 0;
509
    rsp->last_timestamp = req_ctx->last_timestamp;
510
  }
511
  add_response(ctx, rsp);
512

    
513
  return (req_ctx->status == S3StatusOK) ? 0 : 1;
514
}
515

    
516
static int request_handler_process_requests(struct libs3_cloud_context *ctx)
517
{
518
  libs3_request_t *req;
519
  int req_number;
520

    
521
  req_number = 0;
522
  do {
523
    pthread_mutex_lock(&ctx->req_queue_lock);
524
    req = fifo_queue_remove_head(ctx->req_queue);
525
    pthread_mutex_unlock(&ctx->req_queue_lock);
526

    
527
    if (req) {
528
      int status;
529

    
530
      req_number++;
531

    
532
      switch (req->op) {
533
      case PUT:
534
        status = request_handler_process_put_request(ctx, req);
535
        break;
536
      case GET:
537
        status = request_handler_process_get_request(ctx, req);
538
        break;
539
      default:
540
        /* WTF: should not be here! */
541
        fprintf(stderr,
542
                "libs3_delegate_helper: operation type not supported!\n");
543

    
544
        status = 0;
545
      }
546

    
547
      if (status == 1) {
548
        fprintf(stderr,
549
                "libs3_delegate_helper: failed to perform operation\n");
550
      }
551

    
552
      free_request(req);
553
    }
554
  } while(req != NULL);
555

    
556
  return req_number;
557
}
558

    
559
void* request_handler(void *data)
560
{
561
  struct libs3_cloud_context *ctx;
562

    
563
  ctx = (struct libs3_cloud_context *) data;
564
  do{
565
    /* wait for main thread to signal there's some work to do */
566
    pthread_mutex_lock(&ctx->req_handler_sync_mutex);
567

    
568
    /* make sure not to block if there's already something to handle */
569
    if (fifo_queue_size(ctx->req_queue) == 0) {
570
      pthread_cond_wait(&ctx->req_handler_sync_cond,
571
                        &ctx->req_handler_sync_mutex);
572
    }
573
    request_handler_process_requests(ctx);
574

    
575
    pthread_mutex_unlock(&ctx->req_handler_sync_mutex);
576
  } while(1); /* grapes TopologyManager don't support termination */
577
}
578

    
579

    
580
/************************************************************************
581
 * cloud helper implementation
582
 ************************************************************************/
583
static void deallocate_context(struct libs3_cloud_context *ctx)
584
{
585
  if (!ctx) return;
586
  if (ctx->s3_bucket_context.accessKeyId)
587
    free(ctx->s3_bucket_context.accessKeyId);
588
  if (ctx->s3_bucket_context.secretAccessKey)
589
    free(ctx->s3_bucket_context.secretAccessKey);
590
  if (ctx->s3_bucket_context.bucketName)
591
    free(ctx->s3_bucket_context.bucketName);
592

    
593
  /* TODO: we should use a more specialized function instead of the
594
     standard free. But since the cloud_helper do not take into
595
     account deallocation the queue will always be empty when we
596
     enter this function */
597
  if (ctx->req_queue) fifo_queue_destroy(ctx->req_queue, NULL);
598
  if (ctx->rsp_queue) fifo_queue_destroy(ctx->rsp_queue, NULL);
599

    
600
  /* destroy mutexs, conds, threads, ...
601
     This should be safe as no mutex has already been locked and both
602
     mutex_destroy/cond_destroy check fail with EINVAL if the
603
     specified object is not initialized */
604
  pthread_mutex_destroy(&ctx->req_queue_lock);
605
  pthread_mutex_destroy(&ctx->rsp_queue_lock);
606
  pthread_mutex_destroy(&ctx->req_handler_sync_mutex);
607
  pthread_mutex_destroy(&ctx->rsp_queue_sync_mutex);
608

    
609
  pthread_cond_destroy(&ctx->rsp_queue_sync_cond);
610
  pthread_cond_destroy(&ctx->req_handler_sync_cond);
611

    
612
  pthread_attr_destroy(&ctx->req_handler_thread_attr);
613

    
614
  free(ctx);
615
  return;
616
}
617

    
618

    
619
void* cloud_helper_init(struct nodeID *local, const char *config)
620
{
621
  struct libs3_cloud_context *ctx;
622
  struct tag *cfg_tags;
623
  const char *arg;
624
  int err;
625

    
626
  ctx = malloc(sizeof(struct libs3_cloud_context));
627
  memset(ctx, 0, sizeof(struct libs3_cloud_context));
628
  cfg_tags = config_parse(config);
629

    
630
  /* Parse fundametal parameters */
631
  arg = config_value_str(cfg_tags, "s3_access_key");
632
  if (!arg) {
633
    deallocate_context(ctx);
634
    fprintf(stderr,
635
            "libs3_delegate_helper: missing required parameter " \
636
            "'s3_access_key'\n");
637
    return 0;
638
  }
639
  ctx->s3_bucket_context.accessKeyId = strdup(arg);
640

    
641
  arg = config_value_str(cfg_tags, "s3_secret_key");
642
  if (!arg) {
643
    deallocate_context(ctx);
644
    fprintf(stderr,
645
            "libs3_delegate_helper: missing required parameter " \
646
            "'s3_secret_key'\n");
647
    return 0;
648
  }
649
  ctx->s3_bucket_context.secretAccessKey = strdup(arg);
650

    
651
  arg = config_value_str(cfg_tags, "s3_bucket_name");
652
  if (!arg) {
653
    deallocate_context(ctx);
654
    fprintf(stderr,
655
            "libs3_delegate_helper: missing required parameter " \
656
            "'s3_bucket_name'\n");
657
    return 0;
658
  }
659
  ctx->s3_bucket_context.bucketName = strdup(arg);
660

    
661
  ctx->s3_bucket_context.protocol = S3ProtocolHTTP;
662
  arg = config_value_str(cfg_tags, "s3_protocol");
663
  if (arg) {
664
    if (strcmp(arg, "https") == 0) {
665
      ctx->s3_bucket_context.protocol = S3ProtocolHTTPS;
666
    }
667
  }
668

    
669
  ctx->s3_bucket_context.uriStyle = S3UriStylePath;
670

    
671

    
672
  /* Parse optional parameters */
673
  ctx->blocking_put_request = 0;
674
  arg = config_value_str(cfg_tags, "s3_blocking_put");
675
  if (arg) {
676
    if (strcmp(arg, "1") == 0)
677
      ctx->blocking_put_request = 1;
678
  }
679

    
680
  /* Initialize data structures */
681
  if (S3_initialize("libs3_delegate_helper", S3_INIT_ALL) != S3StatusOK) {
682
    deallocate_context(ctx);
683
    return 0;
684
  }
685

    
686
  ctx->req_queue = fifo_queue_create(10);
687
  if (!ctx->req_queue) {
688
    deallocate_context(ctx);
689
    return 0;
690
  }
691

    
692
  ctx->rsp_queue = fifo_queue_create(10);
693
  if (!ctx->rsp_queue) {
694
    deallocate_context(ctx);
695
    return 0;
696
  }
697

    
698

    
699
  err = pthread_mutex_init(&ctx->req_queue_lock, NULL);
700
  if (err) {
701
    deallocate_context(ctx);
702
    return 0;
703
  }
704

    
705
  err = pthread_mutex_init(&ctx->rsp_queue_lock, NULL);
706
  if (err) {
707
    deallocate_context(ctx);
708
    return 0;
709
  }
710

    
711
  err = pthread_mutex_init(&ctx->rsp_queue_sync_mutex, NULL);
712
  if (err) {
713
    deallocate_context(ctx);
714
    return 0;
715
  }
716

    
717
  err = pthread_cond_init (&ctx->rsp_queue_sync_cond, NULL);
718
  if (err) {
719
    deallocate_context(ctx);
720
    return 0;
721
  }
722

    
723
  err = pthread_mutex_init(&ctx->req_handler_sync_mutex, NULL);
724
  if (err) {
725
    deallocate_context(ctx);
726
    return 0;
727
  }
728

    
729
  err = pthread_cond_init (&ctx->req_handler_sync_cond, NULL);
730
  if (err) {
731
    deallocate_context(ctx);
732
    return 0;
733
  }
734

    
735
  err = pthread_attr_init(&ctx->req_handler_thread_attr);
736
  if (err) {
737
    deallocate_context(ctx);
738
    return 0;
739
  }
740

    
741
  err = pthread_attr_setdetachstate(&ctx->req_handler_thread_attr,
742
                                    PTHREAD_CREATE_JOINABLE);
743
  if (err) {
744
    deallocate_context(ctx);
745
    return 0;
746
  }
747

    
748
  err = pthread_create(&ctx->req_handler_thread,
749
                       &ctx->req_handler_thread_attr,
750
                       &request_handler,
751
                       (void *)ctx);
752
  if (err) {
753
    deallocate_context(ctx);
754
    return 0;
755
  }
756

    
757
  return ctx;
758
}
759

    
760
int get_from_cloud(void *context, const char *key, uint8_t *header_ptr,
761
                   int header_size, int free_header)
762
{
763
  struct libs3_cloud_context *ctx;
764
  libs3_request_t *request;
765

    
766
  ctx = (struct libs3_cloud_context *) context;
767
  request = malloc(sizeof(libs3_request_t));
768

    
769
  if (!request) return 1;
770

    
771
  request->op = GET;
772
  request->key = strdup(key);
773
  request->data = header_ptr;
774
  request->data_length = header_size;
775
  request->free_data = free_header;
776

    
777
  add_request(ctx, request);
778

    
779
  return 0;
780
}
781

    
782
  int put_on_cloud(void *context, const char *key, uint8_t *buffer_ptr,
783
                   int buffer_size, int free_buffer)
784
  {
785
  struct libs3_cloud_context *ctx;
786
  libs3_request_t *request;
787

    
788
  ctx = (struct libs3_cloud_context *) context;
789
  request = malloc(sizeof(libs3_request_t));
790

    
791
  if (!request) return 1;
792

    
793
  request->op = PUT;
794
  request->key = strdup(key);
795
  request->data = buffer_ptr;
796
  request->data_length = buffer_size;
797
  request->free_data = free_buffer;
798

    
799
  if (ctx->blocking_put_request) {
800
    int res;
801
    res = request_handler_process_put_request(ctx, request);
802
    free_request(request);
803
    return res;
804
  }
805
  else
806
    add_request(ctx, request);
807

    
808
  return 0;
809
  }
810

    
811
struct nodeID* get_cloud_node(void *context, uint8_t variant)
812
{
813
  return create_node(CLOUD_NODE_ADDR, variant);
814
}
815

    
816
time_t timestamp_cloud(void *context)
817
{
818
  struct libs3_cloud_context *ctx;
819
  ctx = (struct libs3_cloud_context *) context;
820

    
821
  return ctx->last_rsp_timestamp;
822
}
823

    
824
int is_cloud_node(void *context, struct nodeID* node)
825
{
826
  return strcmp(node_ip(node), CLOUD_NODE_ADDR) == 0;
827
}
828

    
829
int wait4cloud(void *context, struct timeval *tout)
830
{
831
  struct libs3_cloud_context *ctx;
832
  libs3_get_response_t *rsp;
833

    
834
  ctx = (struct libs3_cloud_context *) context;
835

    
836
  rsp = wait4response(ctx, tout);
837

    
838
  if (rsp) {
839
    if (rsp->status == S3StatusOK) {
840
      ctx->last_rsp_timestamp = rsp->last_timestamp;
841
      return 1;
842
    } else {
843
      /* there was some error with the request */
844
      pop_response(ctx);
845
      return -1;
846
    }
847
  } else {
848
    return 0;
849
  }
850
}
851

    
852
int recv_from_cloud(void *context, uint8_t *buffer_ptr, int buffer_size)
853
{
854
  struct libs3_cloud_context *ctx;
855
  libs3_get_response_t *rsp;
856
  int toread;
857

    
858
  ctx = (struct libs3_cloud_context *) context;
859

    
860
  rsp = get_response(ctx);
861
  if (!rsp) return -1;
862

    
863
  if (rsp->read_bytes == rsp->data_length){
864
    pop_response(ctx);
865
    return 0;
866
  }
867

    
868
  toread = (rsp->data_length <= buffer_size)? rsp->data_length : buffer_size;
869

    
870
  memcpy(buffer_ptr, rsp->current_byte, toread);
871
  rsp->current_byte += toread;
872
  rsp->read_bytes += toread;
873

    
874
  return toread;
875
}
876

    
877
struct delegate_iface delegate_impl = {
878
  .cloud_helper_init = &cloud_helper_init,
879
  .get_from_cloud = &get_from_cloud,
880
  .put_on_cloud = &put_on_cloud,
881
  .get_cloud_node = &get_cloud_node,
882
  .timestamp_cloud = &timestamp_cloud,
883
  .is_cloud_node = &is_cloud_node,
884
  .wait4cloud = &wait4cloud,
885
  .recv_from_cloud = &recv_from_cloud
886
};