Statistics
| Branch: | Revision:

grapes / src / Tests / cloud_helper_delegate_file.c @ ef8002fd

History | View | Annotate | Download (7.49 KB)

1
#include <unistd.h>
2
#include <stdlib.h>
3
#include <stdio.h>
4
#include <sys/file.h>
5
#include <string.h>
6
#include <stdint.h>
7
#include <semaphore.h>
8
#include <pthread.h>
9

    
10
#include "net_helper.h"
11

    
12
struct delegate_iface {
13
  void* (*cloud_helper_init)(struct nodeID *local, const char *config);
14
  int (*get_from_cloud)(void *context, char *key, uint8_t *header_ptr, int header_size);
15
  int (*put_on_cloud)(void *context, char *key, uint8_t *buffer_ptr, int buffer_size);
16
  struct nodeID* (*get_cloud_node)(void *context);
17
  int (*is_cloud_node)(void *context, struct nodeID* node);
18
  int (*wait4cloud)(void *context, struct timeval *tout);
19
  int (*recv_from_cloud)(void *context, uint8_t *buffer_ptr, int buffer_size);
20
};
21

    
22
struct mem_offset {
23
  int start;
24
  int end;
25
};
26

    
27
struct file_cloud_context {
28
  const char *path;
29
  sem_t sem;
30
  uint8_t *out_buffer[10];
31
  struct mem_offset out_len[10];
32
  int out_cnt;
33
  int key_error;
34
  struct nodeID* cloud_node_base;
35
};
36

    
37

    
38
struct gpThreadContext{
39
  struct file_cloud_context *cloud;
40
  char *key;
41
  uint8_t *value;
42
  int value_len;
43
  uint8_t *header_ptr;
44
  int header_size;
45
};
46

    
47
struct entry {
48
  char key[100];
49
  uint8_t value[100];
50
  int value_len;
51
};
52

    
53

    
54
void* getValueForKey(void *context)
55
{
56
  struct entry e;
57
  struct gpThreadContext *ctx;
58
  FILE *fd;
59
  uint8_t *buffer_ptr;
60
  struct mem_offset *len;
61
  ctx = (struct gpThreadContext *) context;
62

    
63
  fd = fopen(ctx->cloud->path, "r");
64
  if (!fd) pthread_exit(NULL);
65
  flock(fileno(fd), LOCK_SH);
66

    
67
  sem_wait(&ctx->cloud->sem);
68
  ctx->cloud->key_error = -1;
69
  sem_post(&ctx->cloud->sem);
70

    
71
  ctx->cloud->out_cnt++;
72
  ctx->cloud->out_buffer[ctx->cloud->out_cnt] = calloc(100, sizeof(uint8_t));
73
  len = malloc(sizeof(struct mem_offset));
74
  len->start = 0;
75
  len->end = ctx->header_size;
76

    
77
  buffer_ptr = ctx->cloud->out_buffer[ctx->cloud->out_cnt];
78
  memcpy(buffer_ptr, ctx->header_ptr, ctx->header_size);
79

    
80
  buffer_ptr += ctx->header_size;
81

    
82
  while (fread(&e, sizeof(e), 1, fd) != 0){
83
    if (strcmp(e.key, ctx->key) == 0){
84

    
85
      sem_wait(&ctx->cloud->sem);
86
      memcpy(buffer_ptr, e.value, e.value_len);
87
      len->end += e.value_len;      
88

    
89
      ctx->cloud->key_error = 0;
90
      sem_post(&ctx->cloud->sem);
91
      break;
92
    }
93
  }
94

    
95
  sem_wait(&ctx->cloud->sem);
96
  ctx->cloud->out_len[ctx->cloud->out_cnt] = *len;
97
  if (ctx->cloud->key_error == -1)
98
    ctx->cloud->key_error = 1;
99
  sem_post(&ctx->cloud->sem);
100

    
101
  flock(fileno(fd), LOCK_UN);
102
  fclose(fd);
103

    
104
  if (ctx->key) free(ctx->key);
105
  if (ctx->value) free(ctx->value);
106
  free(ctx);
107

    
108
  pthread_exit(NULL);
109
}
110

    
111
void* putValueForKey(void *context){
112
  struct entry e;
113
  struct gpThreadContext *ctx;
114
  FILE *fd;
115
  int found = 0;
116
  ctx = (struct gpThreadContext *)context;
117

    
118
  fd = fopen(ctx->cloud->path, "r+");
119
  if (!fd) fd = fopen(ctx->cloud->path, "w+");
120
  flock(fileno(fd), LOCK_EX);
121
  while (fread(&e, sizeof(e), 1, fd) != 0){
122
    if (strcmp(e.key, ctx->key) == 0){
123
      memcpy(e.value, ctx->value, ctx->value_len);
124
      e.value_len = ctx->value_len;
125
      fseek(fd, -sizeof(e), SEEK_CUR);
126
      fwrite(&e, sizeof(e), 1, fd);
127
      found = 1;
128
      break;
129
    }
130
  }
131
  if (!found){
132
    strcpy(e.key, ctx->key);
133
    memcpy(e.value, ctx->value, ctx->value_len);
134
    e.value_len = ctx->value_len;    
135
    fwrite(&e, sizeof(e), 1, fd);
136
  }
137
  fflush(fd);
138
  flock(fileno(fd), LOCK_UN);
139
  fclose(fd);
140

    
141
  if (ctx->key) free(ctx->key);
142
  if (ctx->value) free(ctx->value);
143
  free(ctx);
144

    
145
  pthread_exit(NULL);
146
}
147

    
148

    
149

    
150
static void* file_cloud_helper_init(struct nodeID *local, const char *config)
151
{
152
  struct file_cloud_context *ctx;
153
  char *conf = strdup(config);
154
  char *opt;
155
  char *key;
156
  const char *path = NULL;
157

    
158

    
159
  while((opt = strsep(&conf, ",")) != NULL) {
160
    key = strsep(&opt, "=");
161
    if (!key) continue;
162
    if (strcmp(key, "file_cloud_path") == 0){
163
      if (opt) path = opt;
164
    }
165
  }
166
  
167
  if (!path) path = "cloud_dump";
168

    
169
  ctx = malloc(sizeof(struct file_cloud_context));
170

    
171
  sem_init(&ctx->sem, 0, 1);
172
  ctx->path = path;
173
  ctx->out_cnt = -1;
174
  ctx->cloud_node_base = create_node("0.0.0.0", 0);
175
  return ctx;
176
}
177

    
178
static int file_cloud_get_from_cloud(void *context, char *key, uint8_t *header_ptr, int header_size)
179
{
180
  struct file_cloud_context *ctx;
181
  int err;
182
  struct gpThreadContext *tc;
183
  pthread_t *thread = malloc(sizeof(pthread_t));
184

    
185

    
186
  ctx = (struct file_cloud_context *)context;
187
  tc = malloc(sizeof(struct gpThreadContext));
188
  tc->cloud = ctx;
189
  tc->key = strdup(key);
190
  tc->header_ptr = header_ptr;
191
  tc->header_size = header_size;
192

    
193
  err = pthread_create(thread, NULL, getValueForKey, (void *)tc);
194
  if (err) return 1;
195
  return 0;
196
}
197

    
198
static int file_cloud_put_on_cloud(void *context, char *key, uint8_t *buffer_ptr, int buffer_size)
199
{ 
200
  struct file_cloud_context *ctx;
201
  int err;
202
  struct gpThreadContext *tc;
203
  pthread_t *thread = malloc(sizeof(pthread_t));
204
  pthread_attr_t attr;
205
  void *status = NULL;
206

    
207
  pthread_attr_init(&attr);
208
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
209

    
210
  ctx = (struct file_cloud_context *)context;
211
  tc = malloc(sizeof(struct gpThreadContext));
212
  tc->cloud = ctx;
213
  tc->key = strdup(key);
214
  tc->value = calloc(buffer_size, sizeof(uint8_t));
215
  memcpy(tc->value, buffer_ptr, buffer_size);
216
  tc->value_len = buffer_size;
217

    
218
  err = pthread_create(thread, &attr, putValueForKey, (void *)tc);
219
  if (err) return 1;
220

    
221
  pthread_attr_destroy(&attr);
222
  pthread_join(*thread, &status);
223

    
224
  return 0;  
225
}
226

    
227
struct nodeID* file_cloud_get_cloud_node(void *context)
228
{
229
  struct file_cloud_context *ctx;
230
  ctx = (struct file_cloud_context *)context;
231
  return ctx->cloud_node_base;
232
}
233

    
234
int file_cloud_is_cloud_node(void *context, struct nodeID* node)
235
{
236
  struct file_cloud_context *ctx;
237
  struct nodeID *candidate_node;
238
  int result = -1;
239
  ctx = (struct file_cloud_context *)context;
240
  candidate_node = create_node(node_ip(node), 0);
241

    
242
  result = nodeid_equal(ctx->cloud_node_base, candidate_node);
243
  nodeid_free(candidate_node);
244
  return result;
245
}
246

    
247
static int file_cloud_wait4cloud(void *context, struct timeval *tout)
248
{
249
  struct file_cloud_context *ctx;
250
  long timeout = tout->tv_sec * 1000 + tout->tv_usec;
251

    
252
  ctx = (struct file_cloud_context *)context;
253
  if (ctx->key_error == 1) return -1;
254
  if (ctx->out_cnt > 0) return 1;
255
  if (ctx->out_cnt == 0 && ctx->key_error >= 0) return 1;
256

    
257
  while (timeout > 0){
258
    usleep(100);
259
    timeout -= 100;
260
    if (ctx->key_error == 1) return -1;
261
    if (ctx->out_cnt > 0) return 1;
262
    if (ctx->out_cnt == 0 && ctx->key_error >= 0) return 1;
263
  }
264
  return 0;
265
}
266

    
267
static int file_cloud_recv_from_cloud(void *context, uint8_t *buffer_ptr, int buffer_size)
268
{
269
  struct file_cloud_context *ctx;
270
  uint8_t *source_ptr;
271
  int start, end, len;
272
  ctx = (struct file_cloud_context *)context;
273

    
274
  if (ctx->out_cnt < 0) return 0;
275

    
276
  sem_wait(&ctx->sem);
277

    
278
  source_ptr = ctx->out_buffer[ctx->out_cnt];
279
  start = ctx->out_len[ctx->out_cnt].start;
280
  end = ctx->out_len[ctx->out_cnt].end;
281
  len = end - start;
282
  source_ptr = source_ptr + start;
283

    
284
  if (buffer_size < len){
285
    memcpy(buffer_ptr,  source_ptr, buffer_size);
286
    start += buffer_size;
287
    ctx->out_len[ctx->out_cnt].start = start;
288
    sem_post(&ctx->sem);
289
    return buffer_size;
290
  } else {
291
    memcpy(buffer_ptr,  source_ptr, len);    
292
    free(ctx->out_buffer[ctx->out_cnt]);
293
    ctx->out_cnt--;
294
    sem_post(&ctx->sem);
295
    return len;
296
  }
297
}
298

    
299
struct delegate_iface delegate_impl = {
300
  .cloud_helper_init = &file_cloud_helper_init,
301
  .get_from_cloud = &file_cloud_get_from_cloud,
302
  .put_on_cloud = &file_cloud_put_on_cloud,
303
  .get_cloud_node = &file_cloud_get_cloud_node,
304
  .is_cloud_node = &file_cloud_is_cloud_node,
305
  .wait4cloud = file_cloud_wait4cloud,
306
  .recv_from_cloud = file_cloud_recv_from_cloud
307
};