Revision 4692000a

View differences:

src/CloudSupport/Makefile
1
ifndef BASE
2
BASE = ../..
3
else
4
vpath %.c $(BASE)/src/$(notdir $(CURDIR))
5
endif
6
CFGDIR ?= ..
7

  
8
OBJS = cloud_helper.o cloud_helper_utils.o cloud_helper_delegate.o
9

  
10
include $(BASE)/src/utils.mak
src/CloudSupport/cloud_helper.c
1
/*
2
 *  Copyright (c) 2010 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

  
7
#include <stdlib.h>
8
#include <string.h>
9

  
10
#include "cloud_helper.h"
11
#include "cloud_helper_iface.h"
12
#include "config.h"
13

  
14
extern struct cloud_helper_iface delegate;
15

  
16
struct cloud_helper_context {
17
  struct cloud_helper_iface *ch;
18
  struct cloud_helper_impl_context *ch_context;
19
};
20

  
21
static int ctx_counter = 0;
22
static struct nodeID* node_ids[CLOUD_HELPER_MAX_INSTANCES];
23
static struct cloud_helper_context* cloud_ctxs[CLOUD_HELPER_MAX_INSTANCES];
24

  
25
static int add_context(struct nodeID *local, struct cloud_helper_context *ctx)
26
{
27
  int i;
28
  if (ctx_counter >= CLOUD_HELPER_MAX_INSTANCES) return 1;
29

  
30
  for (i=0; i<ctx_counter; i++)
31
    if (nodeid_equal(node_ids[i], local)) return 0;
32

  
33
  node_ids[ctx_counter] = local;
34
  cloud_ctxs[ctx_counter] = ctx;
35
  ctx_counter++;
36

  
37
  return 1;
38
}
39

  
40
struct cloud_helper_context* cloud_helper_init(struct nodeID *local, const char *config)
41
{
42
  struct cloud_helper_context *ctx;
43
  struct tag *cfg_tags;
44
  const char *provider;
45

  
46
  cfg_tags = config_parse(config);
47
  provider = config_value_str(cfg_tags, "provider");
48

  
49
  if (!provider) return NULL;
50

  
51
  ctx = malloc(sizeof(struct cloud_helper_context));
52
  if (!ctx) return NULL;
53
  if (strcmp(provider, "delegate") == 0){
54
    ctx->ch = &delegate;
55
  }
56

  
57
 ctx->ch_context = ctx->ch->cloud_helper_init(local, config);
58
 if(!ctx->ch_context){
59
   free(ctx);
60
   return NULL;
61
 }
62

  
63
 if (!add_context(local, ctx)){
64
   //TODO: a better deallocation process is needed
65
   free(ctx->ch_context);
66
   free(ctx);
67
   return NULL;
68
 }
69

  
70
 return ctx;
71
}
72

  
73
struct cloud_helper_context* get_cloud_helper_for(struct nodeID *local){
74
  int i;
75
  for (i=0; i<ctx_counter; i++)
76
    if (node_ids[i] == local) return cloud_ctxs[i];
77

  
78
  return NULL;
79
}
80

  
81
int get_from_cloud(struct cloud_helper_context *context, char *key, uint8_t *header_ptr, int header_size)
82
{
83
  return context->ch->get_from_cloud(context->ch_context, key, header_ptr, header_size);
84
}
85

  
86
int put_on_cloud(struct cloud_helper_context *context, char *key, uint8_t *buffer_ptr, int buffer_size)
87
{
88
  return context->ch->put_on_cloud(context->ch_context, key, buffer_ptr, buffer_size);
89
}
90

  
91
struct nodeID* get_cloud_node(struct cloud_helper_context *context, uint8_t variant)
92
{
93
  return context->ch->get_cloud_node(context->ch_context, variant);
94
}
95

  
96
time_t timestamp_cloud(struct cloud_helper_context *context)
97
{
98
  return context->ch->timestamp_cloud(context->ch_context);
99
}
100

  
101
int is_cloud_node(struct cloud_helper_context *context, struct nodeID* node)
102
{
103
  return context->ch->is_cloud_node(context->ch_context, node);
104
}
105

  
106
int wait4cloud(struct cloud_helper_context *context, struct timeval *tout)
107
{
108
  return context->ch->wait4cloud(context->ch_context, tout);
109
}
110

  
111
int recv_from_cloud(struct cloud_helper_context *context, uint8_t *buffer_ptr, int buffer_size)
112
{
113
  return context->ch->recv_from_cloud(context->ch_context, buffer_ptr, buffer_size);
114
}
src/CloudSupport/cloud_helper_delegate.c
1
/*
2
 *  Copyright (c) 2011 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

  
7
#include <stdlib.h>
8
#include <stdint.h>
9
#include <dlfcn.h>
10
#include <stdio.h>
11

  
12
#include "cloud_helper_iface.h"
13
#include "config.h"
14

  
15
struct delegate_iface {
16
  void* (*cloud_helper_init)(struct nodeID *local, const char *config);
17
  int (*get_from_cloud)(void *context, char *key, uint8_t *header_ptr, int header_size);
18
  int (*put_on_cloud)(void *context, char *key, uint8_t *buffer_ptr, int buffer_size);
19
  struct nodeID* (*get_cloud_node)(void *context, uint8_t variant);
20
  time_t (*timestamp_cloud)(void *context);
21
  int (*is_cloud_node)(void *context, struct nodeID* node);
22
  int (*wait4cloud)(void *context, struct timeval *tout);
23
  int (*recv_from_cloud)(void *context, uint8_t *buffer_ptr, int buffer_size);
24
};
25

  
26
struct cloud_helper_impl_context {
27
  struct delegate_iface *delegate;
28
  void *delegate_context;
29
};
30

  
31
static struct cloud_helper_impl_context* delegate_cloud_init(struct nodeID *local, const char *config)
32
{
33
  struct cloud_helper_impl_context *ctx;
34
  struct tag *cfg_tags;
35
  const char *dlib_name;
36
  struct delegate_iface *delegate_impl;
37
  void *dlib;
38

  
39
  cfg_tags = config_parse(config);
40
  dlib_name = config_value_str(cfg_tags, "delegate_lib");
41
  dlib = dlopen(dlib_name, RTLD_NOW);
42
  if (dlib == NULL) {
43
    printf("error: %s", dlerror());
44
    return NULL;
45
  }
46

  
47

  
48
  delegate_impl = (struct delegate_iface *) dlsym(dlib, "delegate_impl");
49
  if (!delegate_impl) return NULL;
50

  
51
  ctx = malloc(sizeof(struct cloud_helper_impl_context));
52
  ctx->delegate = delegate_impl;
53

  
54
  ctx->delegate_context = ctx->delegate->cloud_helper_init(local, config);
55
  if(!ctx->delegate_context) {
56
    free(ctx);
57
    return NULL;
58
  }
59

  
60
  return ctx;
61
}
62

  
63
static int delegate_cloud_get_from_cloud(struct cloud_helper_impl_context *context, char *key, uint8_t *header_ptr, int header_size)
64
{
65
  return context->delegate->get_from_cloud(context->delegate_context, key, header_ptr, header_size);
66
}
67

  
68
static int delegate_cloud_put_on_cloud(struct cloud_helper_impl_context *context, char *key, uint8_t *buffer_ptr, int buffer_size)
69
{
70
  return context->delegate->put_on_cloud(context->delegate_context, key, buffer_ptr, buffer_size);
71
}
72

  
73
static struct nodeID* delegate_cloud_get_cloud_node(struct cloud_helper_impl_context *context, uint8_t variant)
74
{
75
  return context->delegate->get_cloud_node(context->delegate_context, variant);
76
}
77

  
78
static time_t delegate_timestamp_cloud(struct cloud_helper_impl_context *context)
79
{
80
  return context->delegate->timestamp_cloud(context->delegate_context);
81
}
82

  
83
int delegate_is_cloud_node(struct cloud_helper_impl_context *context, struct nodeID* node)
84
{
85
  return context->delegate->is_cloud_node(context->delegate_context, node);
86
}
87

  
88
static int delegate_cloud_wait4cloud(struct cloud_helper_impl_context *context, struct timeval *tout)
89
{
90
  return context->delegate->wait4cloud(context->delegate_context, tout);
91
}
92

  
93
static int delegate_cloud_recv_from_cloud(struct cloud_helper_impl_context *context, uint8_t *buffer_ptr, int buffer_size)
94
{
95
  return context->delegate->recv_from_cloud(context->delegate_context, buffer_ptr, buffer_size);
96
}
97

  
98
struct cloud_helper_iface delegate = {
99
  .cloud_helper_init = delegate_cloud_init,
100
  .get_from_cloud = delegate_cloud_get_from_cloud,
101
  .put_on_cloud = delegate_cloud_put_on_cloud,
102
  .get_cloud_node = delegate_cloud_get_cloud_node,
103
  .timestamp_cloud = delegate_timestamp_cloud,
104
  .is_cloud_node = delegate_is_cloud_node,
105
  .wait4cloud = delegate_cloud_wait4cloud,
106
  .recv_from_cloud = delegate_cloud_recv_from_cloud,
107
};
src/CloudSupport/cloud_helper_iface.h
1
#ifndef CLOUD_HELPER_IFACE
2
#define CLOUD_HELPER_IFACE
3

  
4
#include <time.h>
5
#include "net_helper.h"
6

  
7
struct cloud_helper_impl_context;
8

  
9
struct cloud_helper_iface {
10
  struct cloud_helper_impl_context* (*cloud_helper_init)(struct nodeID *local, const char *config);
11
  int (*get_from_cloud)(struct cloud_helper_impl_context *context, char *key, uint8_t *header_ptr, int header_size);
12
  int (*put_on_cloud)(struct cloud_helper_impl_context *context, char *key, uint8_t *buffer_ptr, int buffer_size);
13
  struct nodeID* (*get_cloud_node)(struct cloud_helper_impl_context *context, uint8_t variant);
14
  time_t (*timestamp_cloud)(struct cloud_helper_impl_context *context);
15
  int (*is_cloud_node)(struct cloud_helper_impl_context *context, struct nodeID* node);
16
  int (*wait4cloud)(struct cloud_helper_impl_context *context, struct timeval *tout);
17
  int (*recv_from_cloud)(struct cloud_helper_impl_context *context, uint8_t *buffer_ptr, int buffer_size);
18
};
19

  
20
#endif
src/CloudSupport/cloud_helper_utils.c
1
/*
2
 *  Copyright (c) 2010 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

  
7
#include <stdlib.h>
8
#include <pthread.h>
9
#include <errno.h>
10

  
11
#include "cloud_helper.h"
12
#include "cloud_helper_utils.h"
13

  
14
struct wait4context {
15
  struct nodeID* node;
16
  struct cloud_helper_context *cloud;
17
  struct timeval *tout;
18
  int *user_fds;
19

  
20
  pthread_mutex_t wait_mutex;
21
  pthread_cond_t wait_cond;
22

  
23
  int status;
24
  int source;
25
};
26

  
27

  
28
static void* wait4data_wrapper(void *ctx)
29
{
30
  struct wait4context *wait4ctx;
31
  struct timeval tout;
32
  int status;
33

  
34
  wait4ctx = (struct wait4context *) ctx;
35
  tout = *wait4ctx->tout;
36
  status = wait4data(wait4ctx->node, &tout, wait4ctx->user_fds);
37
  if (status == 1){
38
    pthread_mutex_lock(&wait4ctx->wait_mutex);
39
    wait4ctx->source = DATA_SOURCE_NET;
40
    wait4ctx->status = status;
41
    pthread_cond_signal(&wait4ctx->wait_cond);
42
    pthread_mutex_unlock(&wait4ctx->wait_mutex);
43
  }
44
  pthread_exit(NULL);
45
}
46

  
47
static void* wait4cloud_wrapper(void *ctx)
48
{
49
  struct wait4context *wait4ctx;
50
  struct timeval tout;
51
  int status;
52

  
53
  wait4ctx = (struct wait4context *) ctx;
54
  tout = *wait4ctx->tout;
55
  status = wait4cloud(wait4ctx->cloud, &tout);
56
  if (status == 1 || status == -1) {
57
    pthread_mutex_lock(&wait4ctx->wait_mutex);
58
    wait4ctx->source = DATA_SOURCE_CLOUD;
59
    wait4ctx->status = status;
60
    pthread_cond_signal(&wait4ctx->wait_cond);
61
    pthread_mutex_unlock(&wait4ctx->wait_mutex);
62
  }
63
  pthread_exit(NULL);
64
}
65

  
66

  
67
int wait4any_threaded(struct nodeID *n, struct cloud_helper_context *cloud, struct timeval *tout, int *user_fds, int *data_source)
68
{
69
  pthread_attr_t attr;
70
  pthread_t wait4data_thread;
71
  pthread_t wait4cloud_thread;
72
  struct wait4context *wait4ctx;
73
  struct timespec timeout;
74
  struct timeval now;
75

  
76
  int err;
77
  int result;
78

  
79
  wait4ctx = malloc(sizeof(struct wait4context));
80
  if (wait4ctx == NULL) return -1;
81
  wait4ctx->node = n;
82
  wait4ctx->cloud = cloud;
83
  wait4ctx->tout = tout;
84
  wait4ctx->user_fds = user_fds;
85

  
86
  pthread_mutex_init(&wait4ctx->wait_mutex, NULL);
87
  pthread_cond_init (&wait4ctx->wait_cond, NULL);
88

  
89
  pthread_attr_init(&attr);
90
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
91

  
92
  pthread_mutex_lock(&wait4ctx->wait_mutex);
93
  pthread_create(&wait4data_thread, &attr, wait4data_wrapper, (void *)wait4ctx);
94
  pthread_create(&wait4cloud_thread, &attr, wait4cloud_wrapper, (void *)wait4ctx);
95

  
96
  gettimeofday(&now, NULL);
97
  timeout.tv_sec = now.tv_sec + tout->tv_sec;
98
  timeout.tv_nsec = now.tv_usec * 1000 + tout->tv_usec * 1000;
99

  
100
  // Wait for one of the thread to signal available data
101
  err = pthread_cond_timedwait(&wait4ctx->wait_cond, &wait4ctx->wait_mutex, &timeout);
102
  if (err ==  0) {
103
    *data_source = wait4ctx->source;
104
    result = 1;
105
  } else if(err == ETIMEDOUT){
106
    *data_source = DATA_SOURCE_NONE;
107
    result = 0;
108
  }
109

  
110
  // Clean up and return
111
  pthread_cancel(wait4data_thread);
112
  pthread_cancel(wait4cloud_thread);
113
  pthread_cond_destroy(&wait4ctx->wait_cond);
114
  pthread_mutex_unlock(&wait4ctx->wait_mutex);
115
  pthread_mutex_destroy(&wait4ctx->wait_mutex);
116

  
117
  return result;
118
}
119

  
120

  
121
/* Subtract the `struct timeval' values X and Y,
122
   storing the result in RESULT.
123
   Return 1 if the difference is negative, otherwise 0.  */
124

  
125
int timeval_subtract(struct timeval *result, struct timeval *x, struct timeval *y)
126
{
127
  /* Perform the carry for the later subtraction by updating y. */
128
  if (x->tv_usec < y->tv_usec) {
129
    int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1;
130
    y->tv_usec -= 1000000 * nsec;
131
    y->tv_sec += nsec;
132
  }
133
  if (x->tv_usec - y->tv_usec > 1000000) {
134
    int nsec = (x->tv_usec - y->tv_usec) / 1000000;
135
    y->tv_usec += 1000000 * nsec;
136
    y->tv_sec -= nsec;
137
  }
138

  
139
  /* Compute the time remaining to wait.
140
     tv_usec is certainly positive. */
141
  result->tv_sec = x->tv_sec - y->tv_sec;
142
  result->tv_usec = x->tv_usec - y->tv_usec;
143

  
144
  /* Return 1 if result is negative. */
145
  return x->tv_sec < y->tv_sec;
146
}
147

  
148

  
149
int wait4any_polling(struct nodeID *n, struct cloud_helper_context *cloud, struct timeval *tout, struct timeval *step_tout, int *user_fds, int *data_source)
150
{
151
  struct timeval timeout;
152
  struct timeval *step_time;
153
  uint8_t turn;
154
  int status;
155

  
156
  timeout = *tout;
157
  if (step_tout == NULL) {
158
    step_time = malloc(sizeof(struct timeval));
159
    step_time->tv_sec = 0;
160
    step_time->tv_usec = 100000;
161
  } else {
162
    *step_time = *step_tout;
163
  }
164

  
165
  turn = DATA_SOURCE_NET;
166
  while (turn != DATA_SOURCE_NONE) {
167
    // Try waiting for 100 milliseconds on one resource
168
    if (turn == DATA_SOURCE_NET)
169
      status =  wait4data(n, step_time, user_fds);
170
    else
171
      status = wait4cloud(cloud, step_time);
172

  
173
    if (status == 1){
174
      // If we got a positive response we're done
175
      *data_source = turn;
176
      free(step_time);
177
      return 1;
178
    } else {
179
      // Go on with another step
180
      turn = (turn == DATA_SOURCE_NET)? DATA_SOURCE_CLOUD:DATA_SOURCE_NET;
181
      if (step_tout == NULL) {
182
        step_time->tv_sec = 0;
183
        step_time->tv_usec = 100000;
184
      } else {
185
        *step_time = *step_tout;
186
      }
187

  
188
      // If we exeded the timeout it's time to stop
189
      if (timeval_subtract(&timeout, &timeout, step_time) < 0)
190
        turn = DATA_SOURCE_NONE;
191
    }
192
  }
193

  
194
  free(step_time);
195
  *data_source = DATA_SOURCE_NONE;
196
  return 0;
197
}
src/Makefile
3 3
endif
4 4
CFGDIR ?= .
5 5

  
6
SUBDIRS = ChunkIDSet ChunkTrading TopologyManager ChunkBuffer PeerSet Scheduler
6
SUBDIRS = ChunkIDSet ChunkTrading TopologyManager ChunkBuffer PeerSet Scheduler CloudSupport
7

  
7 8
COMMON_OBJS = config.o
8 9

  
9 10
.PHONY: subdirs $(SUBDIRS)
src/TopologyManager/Makefile
5 5
endif
6 6
CFGDIR ?= ..
7 7

  
8
OBJS = peersampler.o topman.o ncast.o ncast_proto.o dummy.o cyclon.o cyclon_proto.o topo_proto.o topocache.o blist_cache.o blist_proto.o tman.o dumbTopman.o cloud_helper.o cloud_helper_utils.o cloud_helper_delegate.o cloudcast.o cloudcast_proto.o
8
OBJS = peersampler.o topman.o ncast.o ncast_proto.o dummy.o cyclon.o cyclon_proto.o topo_proto.o topocache.o blist_cache.o blist_proto.o tman.o dumbTopman.o cloudcast.o cloudcast_proto.o
9 9

  
10 10
all: libtopman.a
11 11

  
src/TopologyManager/cloud_helper.c
1
/*
2
 *  Copyright (c) 2010 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

  
7
#include <stdlib.h>
8
#include <string.h>
9

  
10
#include "cloud_helper.h"
11
#include "cloud_helper_iface.h"
12
#include "config.h"
13

  
14
extern struct cloud_helper_iface delegate;
15

  
16
struct cloud_helper_context {
17
  struct cloud_helper_iface *ch;
18
  struct cloud_helper_impl_context *ch_context;
19
};
20

  
21
static int ctx_counter = 0;
22
static struct nodeID* node_ids[CLOUD_HELPER_MAX_INSTANCES];
23
static struct cloud_helper_context* cloud_ctxs[CLOUD_HELPER_MAX_INSTANCES];
24

  
25
static int add_context(struct nodeID *local, struct cloud_helper_context *ctx)
26
{
27
  int i;
28
  if (ctx_counter >= CLOUD_HELPER_MAX_INSTANCES) return 1;
29

  
30
  for (i=0; i<ctx_counter; i++)
31
    if (nodeid_equal(node_ids[i], local)) return 0;
32

  
33
  node_ids[ctx_counter] = local;
34
  cloud_ctxs[ctx_counter] = ctx;
35
  ctx_counter++;
36

  
37
  return 1;
38
}
39

  
40
struct cloud_helper_context* cloud_helper_init(struct nodeID *local, const char *config)
41
{
42
  struct cloud_helper_context *ctx;
43
  struct tag *cfg_tags;
44
  const char *provider;
45

  
46
  cfg_tags = config_parse(config);
47
  provider = config_value_str(cfg_tags, "provider");
48

  
49
  if (!provider) return NULL;
50

  
51
  ctx = malloc(sizeof(struct cloud_helper_context));
52
  if (!ctx) return NULL;
53
  if (strcmp(provider, "delegate") == 0){
54
    ctx->ch = &delegate;
55
  }
56

  
57
 ctx->ch_context = ctx->ch->cloud_helper_init(local, config);
58
 if(!ctx->ch_context){
59
   free(ctx);
60
   return NULL;
61
 }
62

  
63
 if (!add_context(local, ctx)){
64
   //TODO: a better deallocation process is needed
65
   free(ctx->ch_context);
66
   free(ctx);
67
   return NULL;
68
 }
69

  
70
 return ctx;
71
}
72

  
73
struct cloud_helper_context* get_cloud_helper_for(struct nodeID *local){
74
  int i;
75
  for (i=0; i<ctx_counter; i++)
76
    if (node_ids[i] == local) return cloud_ctxs[i];
77

  
78
  return NULL;
79
}
80

  
81
int get_from_cloud(struct cloud_helper_context *context, char *key, uint8_t *header_ptr, int header_size)
82
{
83
  return context->ch->get_from_cloud(context->ch_context, key, header_ptr, header_size);
84
}
85

  
86
int put_on_cloud(struct cloud_helper_context *context, char *key, uint8_t *buffer_ptr, int buffer_size)
87
{
88
  return context->ch->put_on_cloud(context->ch_context, key, buffer_ptr, buffer_size);
89
}
90

  
91
struct nodeID* get_cloud_node(struct cloud_helper_context *context, uint8_t variant)
92
{
93
  return context->ch->get_cloud_node(context->ch_context, variant);
94
}
95

  
96
time_t timestamp_cloud(struct cloud_helper_context *context)
97
{
98
  return context->ch->timestamp_cloud(context->ch_context);
99
}
100

  
101
int is_cloud_node(struct cloud_helper_context *context, struct nodeID* node)
102
{
103
  return context->ch->is_cloud_node(context->ch_context, node);
104
}
105

  
106
int wait4cloud(struct cloud_helper_context *context, struct timeval *tout)
107
{
108
  return context->ch->wait4cloud(context->ch_context, tout);
109
}
110

  
111
int recv_from_cloud(struct cloud_helper_context *context, uint8_t *buffer_ptr, int buffer_size)
112
{
113
  return context->ch->recv_from_cloud(context->ch_context, buffer_ptr, buffer_size);
114
}
src/TopologyManager/cloud_helper_delegate.c
1
/*
2
 *  Copyright (c) 2011 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

  
7
#include <stdlib.h>
8
#include <stdint.h>
9
#include <dlfcn.h>
10
#include <stdio.h>
11

  
12
#include "cloud_helper_iface.h"
13
#include "config.h"
14

  
15
struct delegate_iface {
16
  void* (*cloud_helper_init)(struct nodeID *local, const char *config);
17
  int (*get_from_cloud)(void *context, char *key, uint8_t *header_ptr, int header_size);
18
  int (*put_on_cloud)(void *context, char *key, uint8_t *buffer_ptr, int buffer_size);
19
  struct nodeID* (*get_cloud_node)(void *context, uint8_t variant);
20
  time_t (*timestamp_cloud)(void *context);
21
  int (*is_cloud_node)(void *context, struct nodeID* node);
22
  int (*wait4cloud)(void *context, struct timeval *tout);
23
  int (*recv_from_cloud)(void *context, uint8_t *buffer_ptr, int buffer_size);
24
};
25

  
26
struct cloud_helper_impl_context {
27
  struct delegate_iface *delegate;
28
  void *delegate_context;
29
};
30

  
31
static struct cloud_helper_impl_context* delegate_cloud_init(struct nodeID *local, const char *config)
32
{
33
  struct cloud_helper_impl_context *ctx;
34
  struct tag *cfg_tags;
35
  const char *dlib_name;
36
  struct delegate_iface *delegate_impl;
37
  void *dlib;
38

  
39
  cfg_tags = config_parse(config);
40
  dlib_name = config_value_str(cfg_tags, "delegate_lib");
41
  dlib = dlopen(dlib_name, RTLD_NOW);
42
  if (dlib == NULL) {
43
    printf("error: %s", dlerror());
44
    return NULL;
45
  }
46

  
47

  
48
  delegate_impl = (struct delegate_iface *) dlsym(dlib, "delegate_impl");
49
  if (!delegate_impl) return NULL;
50

  
51
  ctx = malloc(sizeof(struct cloud_helper_impl_context));
52
  ctx->delegate = delegate_impl;
53

  
54
  ctx->delegate_context = ctx->delegate->cloud_helper_init(local, config);
55
  if(!ctx->delegate_context) {
56
    free(ctx);
57
    return NULL;
58
  }
59

  
60
  return ctx;
61
}
62

  
63
static int delegate_cloud_get_from_cloud(struct cloud_helper_impl_context *context, char *key, uint8_t *header_ptr, int header_size)
64
{
65
  return context->delegate->get_from_cloud(context->delegate_context, key, header_ptr, header_size);
66
}
67

  
68
static int delegate_cloud_put_on_cloud(struct cloud_helper_impl_context *context, char *key, uint8_t *buffer_ptr, int buffer_size)
69
{
70
  return context->delegate->put_on_cloud(context->delegate_context, key, buffer_ptr, buffer_size);
71
}
72

  
73
static struct nodeID* delegate_cloud_get_cloud_node(struct cloud_helper_impl_context *context, uint8_t variant)
74
{
75
  return context->delegate->get_cloud_node(context->delegate_context, variant);
76
}
77

  
78
static time_t delegate_timestamp_cloud(struct cloud_helper_impl_context *context)
79
{
80
  return context->delegate->timestamp_cloud(context->delegate_context);
81
}
82

  
83
int delegate_is_cloud_node(struct cloud_helper_impl_context *context, struct nodeID* node)
84
{
85
  return context->delegate->is_cloud_node(context->delegate_context, node);
86
}
87

  
88
static int delegate_cloud_wait4cloud(struct cloud_helper_impl_context *context, struct timeval *tout)
89
{
90
  return context->delegate->wait4cloud(context->delegate_context, tout);
91
}
92

  
93
static int delegate_cloud_recv_from_cloud(struct cloud_helper_impl_context *context, uint8_t *buffer_ptr, int buffer_size)
94
{
95
  return context->delegate->recv_from_cloud(context->delegate_context, buffer_ptr, buffer_size);
96
}
97

  
98
struct cloud_helper_iface delegate = {
99
  .cloud_helper_init = delegate_cloud_init,
100
  .get_from_cloud = delegate_cloud_get_from_cloud,
101
  .put_on_cloud = delegate_cloud_put_on_cloud,
102
  .get_cloud_node = delegate_cloud_get_cloud_node,
103
  .timestamp_cloud = delegate_timestamp_cloud,
104
  .is_cloud_node = delegate_is_cloud_node,
105
  .wait4cloud = delegate_cloud_wait4cloud,
106
  .recv_from_cloud = delegate_cloud_recv_from_cloud,
107
};
src/TopologyManager/cloud_helper_iface.h
1
#ifndef CLOUD_HELPER_IFACE
2
#define CLOUD_HELPER_IFACE
3

  
4
#include <time.h>
5
#include "net_helper.h"
6

  
7
struct cloud_helper_impl_context;
8

  
9
struct cloud_helper_iface {
10
  struct cloud_helper_impl_context* (*cloud_helper_init)(struct nodeID *local, const char *config);
11
  int (*get_from_cloud)(struct cloud_helper_impl_context *context, char *key, uint8_t *header_ptr, int header_size);
12
  int (*put_on_cloud)(struct cloud_helper_impl_context *context, char *key, uint8_t *buffer_ptr, int buffer_size);
13
  struct nodeID* (*get_cloud_node)(struct cloud_helper_impl_context *context, uint8_t variant);
14
  time_t (*timestamp_cloud)(struct cloud_helper_impl_context *context);
15
  int (*is_cloud_node)(struct cloud_helper_impl_context *context, struct nodeID* node);
16
  int (*wait4cloud)(struct cloud_helper_impl_context *context, struct timeval *tout);
17
  int (*recv_from_cloud)(struct cloud_helper_impl_context *context, uint8_t *buffer_ptr, int buffer_size);
18
};
19

  
20
#endif
src/TopologyManager/cloud_helper_utils.c
1
/*
2
 *  Copyright (c) 2010 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

  
7
#include <stdlib.h>
8
#include <pthread.h>
9
#include <errno.h>
10

  
11
#include "cloud_helper.h"
12
#include "cloud_helper_utils.h"
13

  
14
struct wait4context {
15
  struct nodeID* node;
16
  struct cloud_helper_context *cloud;
17
  struct timeval *tout;
18
  int *user_fds;
19

  
20
  pthread_mutex_t wait_mutex;
21
  pthread_cond_t wait_cond;
22

  
23
  int status;
24
  int source;
25
};
26

  
27

  
28
static void* wait4data_wrapper(void *ctx)
29
{
30
  struct wait4context *wait4ctx;
31
  struct timeval tout;
32
  int status;
33

  
34
  wait4ctx = (struct wait4context *) ctx;
35
  tout = *wait4ctx->tout;
36
  status = wait4data(wait4ctx->node, &tout, wait4ctx->user_fds);
37
  if (status == 1){
38
    pthread_mutex_lock(&wait4ctx->wait_mutex);
39
    wait4ctx->source = DATA_SOURCE_NET;
40
    wait4ctx->status = status;
41
    pthread_cond_signal(&wait4ctx->wait_cond);
42
    pthread_mutex_unlock(&wait4ctx->wait_mutex);
43
  }
44
  pthread_exit(NULL);
45
}
46

  
47
static void* wait4cloud_wrapper(void *ctx)
48
{
49
  struct wait4context *wait4ctx;
50
  struct timeval tout;
51
  int status;
52

  
53
  wait4ctx = (struct wait4context *) ctx;
54
  tout = *wait4ctx->tout;
55
  status = wait4cloud(wait4ctx->cloud, &tout);
56
  if (status == 1 || status == -1) {
57
    pthread_mutex_lock(&wait4ctx->wait_mutex);
58
    wait4ctx->source = DATA_SOURCE_CLOUD;
59
    wait4ctx->status = status;
60
    pthread_cond_signal(&wait4ctx->wait_cond);
61
    pthread_mutex_unlock(&wait4ctx->wait_mutex);
62
  }
63
  pthread_exit(NULL);
64
}
65

  
66

  
67
int wait4any_threaded(struct nodeID *n, struct cloud_helper_context *cloud, struct timeval *tout, int *user_fds, int *data_source)
68
{
69
  pthread_attr_t attr;
70
  pthread_t wait4data_thread;
71
  pthread_t wait4cloud_thread;
72
  struct wait4context *wait4ctx;
73
  struct timespec timeout;
74
  struct timeval now;
75

  
76
  int err;
77
  int result;
78

  
79
  wait4ctx = malloc(sizeof(struct wait4context));
80
  if (wait4ctx == NULL) return -1;
81
  wait4ctx->node = n;
82
  wait4ctx->cloud = cloud;
83
  wait4ctx->tout = tout;
84
  wait4ctx->user_fds = user_fds;
85

  
86
  pthread_mutex_init(&wait4ctx->wait_mutex, NULL);
87
  pthread_cond_init (&wait4ctx->wait_cond, NULL);
88

  
89
  pthread_attr_init(&attr);
90
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
91

  
92
  pthread_mutex_lock(&wait4ctx->wait_mutex);
93
  pthread_create(&wait4data_thread, &attr, wait4data_wrapper, (void *)wait4ctx);
94
  pthread_create(&wait4cloud_thread, &attr, wait4cloud_wrapper, (void *)wait4ctx);
95

  
96
  gettimeofday(&now, NULL);
97
  timeout.tv_sec = now.tv_sec + tout->tv_sec;
98
  timeout.tv_nsec = now.tv_usec * 1000 + tout->tv_usec * 1000;
99

  
100
  // Wait for one of the thread to signal available data
101
  err = pthread_cond_timedwait(&wait4ctx->wait_cond, &wait4ctx->wait_mutex, &timeout);
102
  if (err ==  0) {
103
    *data_source = wait4ctx->source;
104
    result = 1;
105
  } else if(err == ETIMEDOUT){
106
    *data_source = DATA_SOURCE_NONE;
107
    result = 0;
108
  }
109

  
110
  // Clean up and return
111
  pthread_cancel(wait4data_thread);
112
  pthread_cancel(wait4cloud_thread);
113
  pthread_cond_destroy(&wait4ctx->wait_cond);
114
  pthread_mutex_unlock(&wait4ctx->wait_mutex);
115
  pthread_mutex_destroy(&wait4ctx->wait_mutex);
116

  
117
  return result;
118
}
119

  
120

  
121
/* Subtract the `struct timeval' values X and Y,
122
   storing the result in RESULT.
123
   Return 1 if the difference is negative, otherwise 0.  */
124

  
125
int timeval_subtract(struct timeval *result, struct timeval *x, struct timeval *y)
126
{
127
  /* Perform the carry for the later subtraction by updating y. */
128
  if (x->tv_usec < y->tv_usec) {
129
    int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1;
130
    y->tv_usec -= 1000000 * nsec;
131
    y->tv_sec += nsec;
132
  }
133
  if (x->tv_usec - y->tv_usec > 1000000) {
134
    int nsec = (x->tv_usec - y->tv_usec) / 1000000;
135
    y->tv_usec += 1000000 * nsec;
136
    y->tv_sec -= nsec;
137
  }
138

  
139
  /* Compute the time remaining to wait.
140
     tv_usec is certainly positive. */
141
  result->tv_sec = x->tv_sec - y->tv_sec;
142
  result->tv_usec = x->tv_usec - y->tv_usec;
143

  
144
  /* Return 1 if result is negative. */
145
  return x->tv_sec < y->tv_sec;
146
}
147

  
148

  
149
int wait4any_polling(struct nodeID *n, struct cloud_helper_context *cloud, struct timeval *tout, struct timeval *step_tout, int *user_fds, int *data_source)
150
{
151
  struct timeval timeout;
152
  struct timeval *step_time;
153
  uint8_t turn;
154
  int status;
155

  
156
  timeout = *tout;
157
  if (step_tout == NULL) {
158
    step_time = malloc(sizeof(struct timeval));
159
    step_time->tv_sec = 0;
160
    step_time->tv_usec = 100000;
161
  } else {
162
    *step_time = *step_tout;
163
  }
164

  
165
  turn = DATA_SOURCE_NET;
166
  while (turn != DATA_SOURCE_NONE) {
167
    // Try waiting for 100 milliseconds on one resource
168
    if (turn == DATA_SOURCE_NET)
169
      status =  wait4data(n, step_time, user_fds);
170
    else
171
      status = wait4cloud(cloud, step_time);
172

  
173
    if (status == 1){
174
      // If we got a positive response we're done
175
      *data_source = turn;
176
      free(step_time);
177
      return 1;
178
    } else {
179
      // Go on with another step
180
      turn = (turn == DATA_SOURCE_NET)? DATA_SOURCE_CLOUD:DATA_SOURCE_NET;
181
      if (step_tout == NULL) {
182
        step_time->tv_sec = 0;
183
        step_time->tv_usec = 100000;
184
      } else {
185
        *step_time = *step_tout;
186
      }
187

  
188
      // If we exeded the timeout it's time to stop
189
      if (timeval_subtract(&timeout, &timeout, step_time) < 0)
190
        turn = DATA_SOURCE_NONE;
191
    }
192
  }
193

  
194
  free(step_time);
195
  *data_source = DATA_SOURCE_NONE;
196
  return 0;
197
}

Also available in: Unified diff