Revision b71e9607

View differences:

include/cloud_helper_utils.h
1
/*
2
 *  Copyright (c) 2010 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

  
7
#ifndef CLOUD_HELPER_UTILS_H
8
#define CLOUD_HELPER_UTILS_H
9

  
10
#include "cloud_helper.h"
11
#include "net_helper.h"
12

  
13
#define DATA_SOURCE_NONE 0
14
#define DATA_SOURCE_NET 1
15
#define DATA_SOURCE_CLOUD 2
16

  
17
/**
18
 * @file cloud_helper_utils.h
19
 *
20
 * @brief Utilities function for managing cloud communications
21
 *
22
 * This files provides some utilities functions that simplifies
23
 * managing the addition of the cloud layer to the architecture
24
 *
25
 */
26

  
27
/**
28
 * @brief Wait for data from peers and cloud via thread
29
 *
30
 * This function handles the waiting for data by peers and cloud
31
 * in a threaded way, hiding the need to manually managing threads.
32
 *
33
 * @param[in] n A pointer to the nodeID for which waiting data
34
 * @param[in] cloud A pointer to the cloud_helper_context for which
35
 * waiting data
36
 * @param[in] tout The timeout for the waiting
37
 * @param[in] user_fds A pointer to an optional array of file
38
 * descriptior to monitor
39
 * @param[out] data_source A pointer which will be used to store the
40
 * source for the data
41
 * @return 1 if data was available, 0 otherwise
42
 */
43
int wait4any_threaded(struct nodeID *n, struct cloud_helper_context *cloud, struct timeval *tout, int *user_fds, int *data_source);
44

  
45

  
46
/**
47
 * @brief Wait for data from peers and cloud in a polling scheme
48
 *
49
 * This function handles the waiting for data by peers and cloud
50
 * by repeatedly waiting for a small time on each reasource.
51
 *
52
 * @param[in] n A pointer to the nodeID for which waiting data
53
 * @param[in] cloud A pointer to the cloud_helper_context for which
54
 * waiting data
55
 * @param[in] tout The timeout for the waiting
56
 * @param[in] step_tout The time to spend waiting in each step. If
57
 * NULL default is 100 milliseconds
58
 * @param[in] user_fds A pointer to an optional array of file
59
 * descriptior to monitor
60
 * @param[out] data_source A pointer which will be used to store the
61
 * source for the data
62
 * @return 1 if data was available, 0 otherwise
63
 */
64
int wait4any_polling(struct nodeID *n, struct cloud_helper_context *cloud, struct timeval *tout, struct timeval *step_tout, int *user_fds, int *data_source);
65

  
66
#endif
src/TopologyManager/Makefile
5 5
endif
6 6
CFGDIR ?= ..
7 7

  
8
OBJS = peersampler.o topman.o ncast.o ncast_proto.o dummy.o cyclon.o cyclon_proto.o topo_proto.o topocache.o blist_cache.o blist_proto.o tman.o dumbTopman.o cloud_helper.o cloud_helper_delegate.o
8
OBJS = peersampler.o topman.o ncast.o ncast_proto.o dummy.o cyclon.o cyclon_proto.o topo_proto.o topocache.o blist_cache.o blist_proto.o tman.o dumbTopman.o cloud_helper.o cloud_helper_utils.o cloud_helper_delegate.o
9 9

  
10 10
all: libtopman.a
11 11

  
src/TopologyManager/cloud_helper_utils.c
1
/*
2
 *  Copyright (c) 2010 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

  
7
#include <stdlib.h>
8
#include <pthread.h>
9
#include <errno.h>
10

  
11
#include "cloud_helper.h"
12
#include "cloud_helper_utils.h"
13

  
14
struct wait4context {
15
  struct nodeID* node;
16
  struct cloud_helper_context *cloud;
17
  struct timeval *tout;
18
  int *user_fds;
19

  
20
  pthread_mutex_t wait_mutex;
21
  pthread_cond_t wait_cond;
22

  
23
  int status;
24
  int source;
25
};
26

  
27

  
28
static void* wait4data_wrapper(void *ctx)
29
{
30
  struct wait4context *wait4ctx;
31
  struct timeval tout;
32
  int status;
33

  
34
  wait4ctx = (struct wait4context *) ctx;
35
  tout = *wait4ctx->tout;
36
  status = wait4data(wait4ctx->node, &tout, wait4ctx->user_fds);
37
  if (status == 1){
38
    pthread_mutex_lock(&wait4ctx->wait_mutex);
39
    wait4ctx->source = DATA_SOURCE_NET;
40
    wait4ctx->status = status;
41
    pthread_cond_signal(&wait4ctx->wait_cond);
42
    pthread_mutex_unlock(&wait4ctx->wait_mutex);
43
  }
44
  pthread_exit(NULL);
45
}
46

  
47
static void* wait4cloud_wrapper(void *ctx)
48
{
49
  struct wait4context *wait4ctx;
50
  struct timeval tout;
51
  int status;
52

  
53
  wait4ctx = (struct wait4context *) ctx;
54
  tout = *wait4ctx->tout;
55
  status = wait4cloud(wait4ctx->cloud, &tout);
56
  if (status == 1 || status == -1) {
57
    pthread_mutex_lock(&wait4ctx->wait_mutex);
58
    wait4ctx->source = DATA_SOURCE_CLOUD;
59
    wait4ctx->status = status;
60
    pthread_cond_signal(&wait4ctx->wait_cond);
61
    pthread_mutex_unlock(&wait4ctx->wait_mutex);
62
  }
63
  pthread_exit(NULL);
64
}
65

  
66

  
67
int wait4any_threaded(struct nodeID *n, struct cloud_helper_context *cloud, struct timeval *tout, int *user_fds, int *data_source)
68
{
69
  pthread_attr_t attr;
70
  pthread_t wait4data_thread;
71
  pthread_t wait4cloud_thread;
72
  struct wait4context *wait4ctx;
73
  struct timespec timeout;
74
  struct timeval now;
75

  
76
  int err;
77
  int result;
78

  
79
  wait4ctx = malloc(sizeof(struct wait4context));
80
  if (wait4ctx == NULL) return -1;
81
  wait4ctx->node = n;
82
  wait4ctx->cloud = cloud;
83
  wait4ctx->tout = tout;
84
  wait4ctx->user_fds = user_fds;
85

  
86
  pthread_mutex_init(&wait4ctx->wait_mutex, NULL);
87
  pthread_cond_init (&wait4ctx->wait_cond, NULL);
88

  
89
  pthread_attr_init(&attr);
90
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
91

  
92
  pthread_mutex_lock(&wait4ctx->wait_mutex);
93
  pthread_create(&wait4data_thread, &attr, wait4data_wrapper, (void *)wait4ctx);
94
  pthread_create(&wait4cloud_thread, &attr, wait4cloud_wrapper, (void *)wait4ctx);
95

  
96
  gettimeofday(&now, NULL);
97
  timeout.tv_sec = now.tv_sec + tout->tv_sec;
98
  timeout.tv_nsec = now.tv_usec * 1000 + tout->tv_usec * 1000;
99

  
100
  // Wait for one of the thread to signal available data
101
  err = pthread_cond_timedwait(&wait4ctx->wait_cond, &wait4ctx->wait_mutex, &timeout);
102
  if (err ==  0) {
103
    *data_source = wait4ctx->source;
104
    result = 1;
105
  } else if(err == ETIMEDOUT){
106
    *data_source = DATA_SOURCE_NONE;
107
    result = 0;
108
  }
109

  
110
  // Clean up and return
111
  pthread_cancel(wait4data_thread);
112
  pthread_cancel(wait4cloud_thread);
113
  pthread_cond_destroy(&wait4ctx->wait_cond);
114
  pthread_mutex_unlock(&wait4ctx->wait_mutex);
115
  pthread_mutex_destroy(&wait4ctx->wait_mutex);
116

  
117
  return result;
118
}
119

  
120

  
121
/* Subtract the `struct timeval' values X and Y,
122
   storing the result in RESULT.
123
   Return 1 if the difference is negative, otherwise 0.  */
124

  
125
int timeval_subtract(struct timeval *result, struct timeval *x, struct timeval *y)
126
{
127
  /* Perform the carry for the later subtraction by updating y. */
128
  if (x->tv_usec < y->tv_usec) {
129
    int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1;
130
    y->tv_usec -= 1000000 * nsec;
131
    y->tv_sec += nsec;
132
  }
133
  if (x->tv_usec - y->tv_usec > 1000000) {
134
    int nsec = (x->tv_usec - y->tv_usec) / 1000000;
135
    y->tv_usec += 1000000 * nsec;
136
    y->tv_sec -= nsec;
137
  }
138

  
139
  /* Compute the time remaining to wait.
140
     tv_usec is certainly positive. */
141
  result->tv_sec = x->tv_sec - y->tv_sec;
142
  result->tv_usec = x->tv_usec - y->tv_usec;
143

  
144
  /* Return 1 if result is negative. */
145
  return x->tv_sec < y->tv_sec;
146
}
147

  
148

  
149
int wait4any_polling(struct nodeID *n, struct cloud_helper_context *cloud, struct timeval *tout, struct timeval *step_tout, int *user_fds, int *data_source)
150
{
151
  struct timeval timeout;
152
  struct timeval *step_time;
153
  uint8_t turn;
154
  int status;
155

  
156
  timeout = *tout;
157
  if (step_tout == NULL) {
158
    step_time = malloc(sizeof(struct timeval));
159
    step_time->tv_sec = 0;
160
    step_time->tv_usec = 100000;
161
  } else {
162
    *step_time = *step_tout;
163
  }
164

  
165
  turn = DATA_SOURCE_NET;
166
  while (turn != DATA_SOURCE_NONE) {
167
    // Try waiting for 100 milliseconds on one resource
168
    if (turn == DATA_SOURCE_NET)
169
      status =  wait4data(n, step_time, user_fds);
170
    else
171
      status = wait4cloud(cloud, step_time);
172

  
173
    if (status == 1){
174
      // If we got a positive response we're done
175
      *data_source = turn;
176
      free(step_time);
177
      return 1;
178
    } else {
179
      // Go on with another step
180
      turn = (turn == DATA_SOURCE_NET)? DATA_SOURCE_CLOUD:DATA_SOURCE_NET;
181
      if (step_tout == NULL) {
182
        step_time->tv_sec = 0;
183
        step_time->tv_usec = 100000;
184
      } else {
185
        *step_time = *step_tout;
186
      }
187

  
188
      // If we exeded the timeout it's time to stop
189
      if (timeval_subtract(&timeout, &timeout, step_time) < 0)
190
        turn = DATA_SOURCE_NONE;
191
    }
192
  }
193

  
194
  free(step_time);
195
  *data_source = DATA_SOURCE_NONE;
196
  return 0;
197
}

Also available in: Unified diff