Statistics
| Branch: | Revision:

grapes / src / Tests / cloud_helper_delegate_file.c @ 47c95dd4

History | View | Annotate | Download (7.88 KB)

1
#include <unistd.h>
2
#include <stdlib.h>
3
#include <stdio.h>
4
#include <sys/file.h>
5
#include <string.h>
6
#include <stdint.h>
7
#include <semaphore.h>
8
#include <pthread.h>
9
#include <time.h>
10

    
11
#include "net_helper.h"
12

    
13
#define CLOUD_KEY_MAX_SIZE 255
14
#define CLOUD_VALUE_MAX_SIZE 2000
15

    
16
struct delegate_iface {
17
  void* (*cloud_helper_init)(struct nodeID *local, const char *config);
18
  int (*get_from_cloud)(void *context, const char *key, uint8_t *header_ptr,
19
                        int header_size, int free_header);
20
  int (*put_on_cloud)(void *context, const char *key, uint8_t *buffer_ptr,
21
                      int buffer_size, int free_buffer);
22
  struct nodeID* (*get_cloud_node)(void *context, uint8_t variant);
23
  time_t (*timestamp_cloud)(void *context);
24
  int (*is_cloud_node)(void *context, struct nodeID* node);
25
  int (*wait4cloud)(void *context, struct timeval *tout);
26
  int (*recv_from_cloud)(void *context, uint8_t *buffer_ptr, int buffer_size);
27
};
28

    
29
struct mem_offset {
30
  int start;
31
  int end;
32
};
33

    
34
struct file_cloud_context {
35
  const char *path;
36
  sem_t sem;
37
  time_t last_timestamp;
38
  uint8_t *out_buffer[10];
39
  struct mem_offset out_len[10];
40
  int out_cnt;
41
  int key_error;
42
  struct nodeID* cloud_node_base;
43
};
44

    
45

    
46
struct gpThreadContext{
47
  struct file_cloud_context *cloud;
48
  char *key;
49
  uint8_t *value;
50
  int value_len;
51
  uint8_t *header_ptr;
52
  int header_size;
53
};
54

    
55
struct entry {
56
  char key[CLOUD_KEY_MAX_SIZE];
57
  uint8_t value[CLOUD_VALUE_MAX_SIZE];
58
  int value_len;
59
  time_t timestamp;
60
};
61

    
62

    
63
void* getValueForKey(void *context)
64
{
65
  struct entry e;
66
  struct gpThreadContext *ctx;
67
  FILE *fd;
68
  uint8_t *buffer_ptr;
69
  struct mem_offset *len;
70
  ctx = (struct gpThreadContext *) context;
71

    
72
  fd = fopen(ctx->cloud->path, "r");
73
  if (!fd) pthread_exit(NULL);
74
  flock(fileno(fd), LOCK_SH);
75

    
76
  sem_wait(&ctx->cloud->sem);
77
  ctx->cloud->key_error = -1;
78
  sem_post(&ctx->cloud->sem);
79

    
80
  ctx->cloud->out_cnt++;
81
  ctx->cloud->out_buffer[ctx->cloud->out_cnt] = calloc(CLOUD_VALUE_MAX_SIZE, sizeof(uint8_t));
82
  len = malloc(sizeof(struct mem_offset));
83
  len->start = 0;
84
  len->end = ctx->header_size;
85

    
86
  buffer_ptr = ctx->cloud->out_buffer[ctx->cloud->out_cnt];
87
  memcpy(buffer_ptr, ctx->header_ptr, ctx->header_size);
88

    
89
  buffer_ptr += ctx->header_size;
90

    
91
  sem_wait(&ctx->cloud->sem);
92

    
93
  while (fread(&e, sizeof(e), 1, fd) != 0){
94
    if (strcmp(e.key, ctx->key) == 0){
95
      memcpy(buffer_ptr, e.value, e.value_len);
96
      len->end += e.value_len;
97
      ctx->cloud->last_timestamp = e.timestamp;
98
      ctx->cloud->out_len[ctx->cloud->out_cnt] = *len;
99
      ctx->cloud->key_error = 0;
100
      break;
101
    }
102
  }
103

    
104
  if (ctx->cloud->key_error == -1){
105
    ctx->cloud->key_error = 1;
106
  }
107
  sem_post(&ctx->cloud->sem);
108

    
109
  flock(fileno(fd), LOCK_UN);
110
  fclose(fd);
111

    
112
  if (ctx->key) free(ctx->key);
113
  if (ctx->value) free(ctx->value);
114
  free(ctx);
115

    
116
  pthread_exit(NULL);
117
}
118

    
119
int putValueForKey(struct gpThreadContext *ctx)
120
{
121
  struct entry e;
122
  FILE *fd;
123
  int found = 0;
124

    
125
  fd = fopen(ctx->cloud->path, "r+");
126
  if (!fd) fd = fopen(ctx->cloud->path, "w+");
127
  flock(fileno(fd), LOCK_EX);
128
  while (fread(&e, sizeof(e), 1, fd) != 0){
129
    if (strcmp(e.key, ctx->key) == 0){
130
      memcpy(e.value, ctx->value, ctx->value_len);
131
      e.value_len = ctx->value_len;
132
      time(&e.timestamp);
133
      fseek(fd, -sizeof(e), SEEK_CUR);
134
      fwrite(&e, sizeof(e), 1, fd);
135
      found = 1;
136
      break;
137
    }
138
  }
139
  if (!found){
140
    strcpy(e.key, ctx->key);
141
    memcpy(e.value, ctx->value, ctx->value_len);
142
    e.value_len = ctx->value_len;
143
    time(&e.timestamp);
144
    fwrite(&e, sizeof(e), 1, fd);
145
  }
146
  fflush(fd);
147
  flock(fileno(fd), LOCK_UN);
148
  fclose(fd);
149

    
150
  if (ctx->key) free(ctx->key);
151
  if (ctx->value) free(ctx->value);
152
  free(ctx);
153

    
154
  return 0;
155
}
156

    
157

    
158

    
159
static void* file_cloud_helper_init(struct nodeID *local, const char *config)
160
{
161
  struct file_cloud_context *ctx;
162
  char *conf = strdup(config);
163
  char *opt;
164
  char *key;
165
  const char *path = NULL;
166

    
167

    
168
  while((opt = strsep(&conf, ",")) != NULL) {
169
    key = strsep(&opt, "=");
170
    if (!key) continue;
171
    if (strcmp(key, "file_cloud_path") == 0){
172
      if (opt) path = opt;
173
    }
174
  }
175

    
176
  if (!path) path = "cloud_dump";
177

    
178
  ctx = malloc(sizeof(struct file_cloud_context));
179

    
180
  sem_init(&ctx->sem, 0, 1);
181
  ctx->path = path;
182
  ctx->out_cnt = -1;
183
  ctx->cloud_node_base = create_node("0.0.0.0", 0);
184
  return ctx;
185
}
186

    
187
static int file_cloud_get_from_cloud(void *context, const char *key,
188
                                     uint8_t *header_ptr, int header_size,
189
                                     int free_header)
190
{
191
  struct file_cloud_context *ctx;
192
  int err;
193
  struct gpThreadContext *tc;
194
  pthread_t *thread = malloc(sizeof(pthread_t));
195

    
196

    
197
  ctx = (struct file_cloud_context *)context;
198
  tc = malloc(sizeof(struct gpThreadContext));
199
  tc->cloud = ctx;
200
  tc->key = strdup(key);
201
  tc->value = NULL;
202
  tc->header_ptr = header_ptr;
203
  tc->header_size = header_size;
204

    
205
  err = pthread_create(thread, NULL, getValueForKey, (void *)tc);
206
  if (err) return 1;
207
  return 0;
208
}
209

    
210
static int file_cloud_put_on_cloud(void *context, const char *key,
211
                                   uint8_t *buffer_ptr, int buffer_size,
212
                                   int free_buffer)
213
{
214
  struct file_cloud_context *ctx;
215
  struct gpThreadContext *tc;
216

    
217
  ctx = (struct file_cloud_context *)context;
218
  tc = malloc(sizeof(struct gpThreadContext));
219
  tc->cloud = ctx;
220
  tc->key = strdup(key);
221
  tc->value = calloc(buffer_size, sizeof(uint8_t));
222
  memcpy(tc->value, buffer_ptr, buffer_size);
223
  tc->value_len = buffer_size;
224

    
225
  return putValueForKey(tc);
226
}
227

    
228
struct nodeID* file_cloud_get_cloud_node(void *context, uint8_t variant)
229
{
230
  struct file_cloud_context *ctx;
231
  ctx = (struct file_cloud_context *)context;
232
  return create_node("0.0.0.0", variant);
233
}
234

    
235
time_t file_cloud_timestamp_cloud(void *context)
236
{
237
  struct file_cloud_context *ctx;
238
  ctx = (struct file_cloud_context *)context;
239

    
240
  return ctx->last_timestamp;
241
}
242

    
243
int file_cloud_is_cloud_node(void *context, struct nodeID* node)
244
{
245
  struct file_cloud_context *ctx;
246
  struct nodeID *candidate_node;
247
  int result = -1;
248
  ctx = (struct file_cloud_context *)context;
249
  candidate_node = create_node(node_ip(node), 0);
250

    
251
  result = nodeid_equal(ctx->cloud_node_base, candidate_node);
252
  nodeid_free(candidate_node);
253
  return result;
254
}
255

    
256
static int file_cloud_wait4cloud(void *context, struct timeval *tout)
257
{
258
  struct file_cloud_context *ctx;
259
  long timeout = tout->tv_sec * 1000 + tout->tv_usec;
260

    
261
  ctx = (struct file_cloud_context *)context;
262
  if (ctx->key_error == 1) return -1;
263
  if (ctx->out_cnt > 0) return 1;
264
  if (ctx->out_cnt == 0 && ctx->key_error >= 0) return 1;
265

    
266
  while (timeout > 0){
267
    usleep(100);
268
    timeout -= 100;
269
    if (ctx->key_error == 1) return -1;
270
    if (ctx->out_cnt > 0) return 1;
271
    if (ctx->out_cnt == 0 && ctx->key_error >= 0) return 1;
272
  }
273
  return 0;
274
}
275

    
276
static int file_cloud_recv_from_cloud(void *context, uint8_t *buffer_ptr, int buffer_size)
277
{
278
  struct file_cloud_context *ctx;
279
  uint8_t *source_ptr;
280
  int start, end, len;
281
  ctx = (struct file_cloud_context *)context;
282

    
283
  if (ctx->out_cnt < 0) return 0;
284

    
285
  sem_wait(&ctx->sem);
286

    
287
  source_ptr = ctx->out_buffer[ctx->out_cnt];
288
  start = ctx->out_len[ctx->out_cnt].start;
289
  end = ctx->out_len[ctx->out_cnt].end;
290
  len = end - start;
291
  source_ptr = source_ptr + start;
292

    
293
  if (buffer_size < len){
294
    memcpy(buffer_ptr,  source_ptr, buffer_size);
295
    start += buffer_size;
296
    ctx->out_len[ctx->out_cnt].start = start;
297
    sem_post(&ctx->sem);
298
    return buffer_size;
299
  } else {
300
    memcpy(buffer_ptr,  source_ptr, len);
301
    free(ctx->out_buffer[ctx->out_cnt]);
302
    ctx->out_cnt--;
303
    sem_post(&ctx->sem);
304
    return len;
305
  }
306
}
307

    
308
struct delegate_iface delegate_impl = {
309
  .cloud_helper_init = &file_cloud_helper_init,
310
  .get_from_cloud = &file_cloud_get_from_cloud,
311
  .put_on_cloud = &file_cloud_put_on_cloud,
312
  .get_cloud_node = &file_cloud_get_cloud_node,
313
  .timestamp_cloud = &file_cloud_timestamp_cloud,
314
  .is_cloud_node = &file_cloud_is_cloud_node,
315
  .wait4cloud = file_cloud_wait4cloud,
316
  .recv_from_cloud = file_cloud_recv_from_cloud
317
};