Statistics
| Branch: | Revision:

grapes / src / Tests / cloud_helper_delegate_file.c @ 0e9bde83

History | View | Annotate | Download (7.94 KB)

1
#include <unistd.h>
2
#include <stdlib.h>
3
#include <stdio.h>
4
#include <sys/file.h>
5
#include <string.h>
6
#include <stdint.h>
7
#include <semaphore.h>
8
#include <pthread.h>
9
#include <time.h>
10

    
11
#include "net_helper.h"
12

    
13
struct delegate_iface {
14
  void* (*cloud_helper_init)(struct nodeID *local, const char *config);
15
  int (*get_from_cloud)(void *context, char *key, uint8_t *header_ptr, int header_size);
16
  int (*put_on_cloud)(void *context, char *key, uint8_t *buffer_ptr, int buffer_size);
17
  struct nodeID* (*get_cloud_node)(void *context, uint8_t variant);
18
  time_t (*timestamp_cloud)(void *context);
19
  int (*is_cloud_node)(void *context, struct nodeID* node);
20
  int (*wait4cloud)(void *context, struct timeval *tout);
21
  int (*recv_from_cloud)(void *context, uint8_t *buffer_ptr, int buffer_size);
22
};
23

    
24
struct mem_offset {
25
  int start;
26
  int end;
27
};
28

    
29
struct file_cloud_context {
30
  const char *path;
31
  sem_t sem;
32
  time_t last_timestamp;
33
  uint8_t *out_buffer[10];
34
  struct mem_offset out_len[10];
35
  int out_cnt;
36
  int key_error;
37
  struct nodeID* cloud_node_base;
38
};
39

    
40

    
41
struct gpThreadContext{
42
  struct file_cloud_context *cloud;
43
  char *key;
44
  uint8_t *value;
45
  int value_len;
46
  uint8_t *header_ptr;
47
  int header_size;
48
};
49

    
50
struct entry {
51
  char key[100];
52
  uint8_t value[100];
53
  int value_len;
54
  time_t timestamp;
55
};
56

    
57

    
58
void* getValueForKey(void *context)
59
{
60
  struct entry e;
61
  struct gpThreadContext *ctx;
62
  FILE *fd;
63
  uint8_t *buffer_ptr;
64
  struct mem_offset *len;
65
  ctx = (struct gpThreadContext *) context;
66

    
67
  fd = fopen(ctx->cloud->path, "r");
68
  if (!fd) pthread_exit(NULL);
69
  flock(fileno(fd), LOCK_SH);
70

    
71
  sem_wait(&ctx->cloud->sem);
72
  ctx->cloud->key_error = -1;
73
  sem_post(&ctx->cloud->sem);
74

    
75
  ctx->cloud->out_cnt++;
76
  ctx->cloud->out_buffer[ctx->cloud->out_cnt] = calloc(100, sizeof(uint8_t));
77
  len = malloc(sizeof(struct mem_offset));
78
  len->start = 0;
79
  len->end = ctx->header_size;
80

    
81
  buffer_ptr = ctx->cloud->out_buffer[ctx->cloud->out_cnt];
82
  memcpy(buffer_ptr, ctx->header_ptr, ctx->header_size);
83

    
84
  buffer_ptr += ctx->header_size;
85

    
86
  while (fread(&e, sizeof(e), 1, fd) != 0){
87
    if (strcmp(e.key, ctx->key) == 0){
88

    
89
      sem_wait(&ctx->cloud->sem);
90
      memcpy(buffer_ptr, e.value, e.value_len);
91
      len->end += e.value_len;      
92
      ctx->cloud->last_timestamp = e.timestamp;
93
      ctx->cloud->key_error = 0;
94
      sem_post(&ctx->cloud->sem);
95
      break;
96
    }
97
  }
98

    
99
  sem_wait(&ctx->cloud->sem);
100
  ctx->cloud->out_len[ctx->cloud->out_cnt] = *len;
101
  if (ctx->cloud->key_error == -1)
102
    ctx->cloud->key_error = 1;
103
  sem_post(&ctx->cloud->sem);
104

    
105
  flock(fileno(fd), LOCK_UN);
106
  fclose(fd);
107

    
108
  if (ctx->key) free(ctx->key);
109
  if (ctx->value) free(ctx->value);
110
  free(ctx);
111

    
112
  pthread_exit(NULL);
113
}
114

    
115
void* putValueForKey(void *context){
116
  struct entry e;
117
  struct gpThreadContext *ctx;
118
  FILE *fd;
119
  int found = 0;
120
  ctx = (struct gpThreadContext *)context;
121

    
122
  fd = fopen(ctx->cloud->path, "r+");
123
  if (!fd) fd = fopen(ctx->cloud->path, "w+");
124
  flock(fileno(fd), LOCK_EX);
125
  while (fread(&e, sizeof(e), 1, fd) != 0){
126
    if (strcmp(e.key, ctx->key) == 0){
127
      memcpy(e.value, ctx->value, ctx->value_len);
128
      e.value_len = ctx->value_len;
129
      time(&e.timestamp);
130
      fseek(fd, -sizeof(e), SEEK_CUR);
131
      fwrite(&e, sizeof(e), 1, fd);
132
      found = 1;
133
      break;
134
    }
135
  }
136
  if (!found){
137
    strcpy(e.key, ctx->key);
138
    memcpy(e.value, ctx->value, ctx->value_len);
139
    e.value_len = ctx->value_len;    
140
    time(&e.timestamp);
141
    fwrite(&e, sizeof(e), 1, fd);
142
  }
143
  fflush(fd);
144
  flock(fileno(fd), LOCK_UN);
145
  fclose(fd);
146

    
147
  if (ctx->key) free(ctx->key);
148
  if (ctx->value) free(ctx->value);
149
  free(ctx);
150

    
151
  pthread_exit(NULL);
152
}
153

    
154

    
155

    
156
static void* file_cloud_helper_init(struct nodeID *local, const char *config)
157
{
158
  struct file_cloud_context *ctx;
159
  char *conf = strdup(config);
160
  char *opt;
161
  char *key;
162
  const char *path = NULL;
163

    
164

    
165
  while((opt = strsep(&conf, ",")) != NULL) {
166
    key = strsep(&opt, "=");
167
    if (!key) continue;
168
    if (strcmp(key, "file_cloud_path") == 0){
169
      if (opt) path = opt;
170
    }
171
  }
172
  
173
  if (!path) path = "cloud_dump";
174

    
175
  ctx = malloc(sizeof(struct file_cloud_context));
176

    
177
  sem_init(&ctx->sem, 0, 1);
178
  ctx->path = path;
179
  ctx->out_cnt = -1;
180
  ctx->cloud_node_base = create_node("0.0.0.0", 0);
181
  return ctx;
182
}
183

    
184
static int file_cloud_get_from_cloud(void *context, char *key, uint8_t *header_ptr, int header_size)
185
{
186
  struct file_cloud_context *ctx;
187
  int err;
188
  struct gpThreadContext *tc;
189
  pthread_t *thread = malloc(sizeof(pthread_t));
190

    
191

    
192
  ctx = (struct file_cloud_context *)context;
193
  tc = malloc(sizeof(struct gpThreadContext));
194
  tc->cloud = ctx;
195
  tc->key = strdup(key);
196
  tc->header_ptr = header_ptr;
197
  tc->header_size = header_size;
198

    
199
  err = pthread_create(thread, NULL, getValueForKey, (void *)tc);
200
  if (err) return 1;
201
  return 0;
202
}
203

    
204
static int file_cloud_put_on_cloud(void *context, char *key, uint8_t *buffer_ptr, int buffer_size)
205
{ 
206
  struct file_cloud_context *ctx;
207
  int err;
208
  struct gpThreadContext *tc;
209
  pthread_t *thread = malloc(sizeof(pthread_t));
210
  pthread_attr_t attr;
211
  void *status = NULL;
212

    
213
  pthread_attr_init(&attr);
214
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
215

    
216
  ctx = (struct file_cloud_context *)context;
217
  tc = malloc(sizeof(struct gpThreadContext));
218
  tc->cloud = ctx;
219
  tc->key = strdup(key);
220
  tc->value = calloc(buffer_size, sizeof(uint8_t));
221
  memcpy(tc->value, buffer_ptr, buffer_size);
222
  tc->value_len = buffer_size;
223

    
224
  err = pthread_create(thread, &attr, putValueForKey, (void *)tc);
225
  if (err) return 1;
226

    
227
  pthread_attr_destroy(&attr);
228
  pthread_join(*thread, &status);
229

    
230
  return 0;  
231
}
232

    
233
struct nodeID* file_cloud_get_cloud_node(void *context, uint8_t variant)
234
{
235
  struct file_cloud_context *ctx;
236
  ctx = (struct file_cloud_context *)context;
237
  return create_node("0.0.0.0", variant);
238
}
239

    
240
time_t file_cloud_timestamp_cloud(void *context)
241
{
242
  struct file_cloud_context *ctx;
243
  ctx = (struct file_cloud_context *)context;
244

    
245
  return ctx->last_timestamp;
246
}
247

    
248
int file_cloud_is_cloud_node(void *context, struct nodeID* node)
249
{
250
  struct file_cloud_context *ctx;
251
  struct nodeID *candidate_node;
252
  int result = -1;
253
  ctx = (struct file_cloud_context *)context;
254
  candidate_node = create_node(node_ip(node), 0);
255

    
256
  result = nodeid_equal(ctx->cloud_node_base, candidate_node);
257
  nodeid_free(candidate_node);
258
  return result;
259
}
260

    
261
static int file_cloud_wait4cloud(void *context, struct timeval *tout)
262
{
263
  struct file_cloud_context *ctx;
264
  long timeout = tout->tv_sec * 1000 + tout->tv_usec;
265

    
266
  ctx = (struct file_cloud_context *)context;
267
  if (ctx->key_error == 1) return -1;
268
  if (ctx->out_cnt > 0) return 1;
269
  if (ctx->out_cnt == 0 && ctx->key_error >= 0) return 1;
270

    
271
  while (timeout > 0){
272
    usleep(100);
273
    timeout -= 100;
274
    if (ctx->key_error == 1) return -1;
275
    if (ctx->out_cnt > 0) return 1;
276
    if (ctx->out_cnt == 0 && ctx->key_error >= 0) return 1;
277
  }
278
  return 0;
279
}
280

    
281
static int file_cloud_recv_from_cloud(void *context, uint8_t *buffer_ptr, int buffer_size)
282
{
283
  struct file_cloud_context *ctx;
284
  uint8_t *source_ptr;
285
  int start, end, len;
286
  ctx = (struct file_cloud_context *)context;
287

    
288
  if (ctx->out_cnt < 0) return 0;
289

    
290
  sem_wait(&ctx->sem);
291

    
292
  source_ptr = ctx->out_buffer[ctx->out_cnt];
293
  start = ctx->out_len[ctx->out_cnt].start;
294
  end = ctx->out_len[ctx->out_cnt].end;
295
  len = end - start;
296
  source_ptr = source_ptr + start;
297

    
298
  if (buffer_size < len){
299
    memcpy(buffer_ptr,  source_ptr, buffer_size);
300
    start += buffer_size;
301
    ctx->out_len[ctx->out_cnt].start = start;
302
    sem_post(&ctx->sem);
303
    return buffer_size;
304
  } else {
305
    memcpy(buffer_ptr,  source_ptr, len);    
306
    free(ctx->out_buffer[ctx->out_cnt]);
307
    ctx->out_cnt--;
308
    sem_post(&ctx->sem);
309
    return len;
310
  }
311
}
312

    
313
struct delegate_iface delegate_impl = {
314
  .cloud_helper_init = &file_cloud_helper_init,
315
  .get_from_cloud = &file_cloud_get_from_cloud,
316
  .put_on_cloud = &file_cloud_put_on_cloud,
317
  .get_cloud_node = &file_cloud_get_cloud_node,
318
  .timestamp_cloud = &file_cloud_timestamp_cloud,
319
  .is_cloud_node = &file_cloud_is_cloud_node,
320
  .wait4cloud = file_cloud_wait4cloud,
321
  .recv_from_cloud = file_cloud_recv_from_cloud
322
};