Statistics
| Branch: | Revision:

grapes / src / TopologyManager / cloud_helper_utils.c @ b71e9607

History | View | Annotate | Download (5.15 KB)

1
/*
2
 *  Copyright (c) 2010 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <stdlib.h>
8
#include <pthread.h>
9
#include <errno.h>
10

    
11
#include "cloud_helper.h"
12
#include "cloud_helper_utils.h"
13

    
14
struct wait4context {
15
  struct nodeID* node;
16
  struct cloud_helper_context *cloud;
17
  struct timeval *tout;
18
  int *user_fds;
19

    
20
  pthread_mutex_t wait_mutex;
21
  pthread_cond_t wait_cond;
22

    
23
  int status;
24
  int source;
25
};
26

    
27

    
28
static void* wait4data_wrapper(void *ctx)
29
{
30
  struct wait4context *wait4ctx;
31
  struct timeval tout;
32
  int status;
33

    
34
  wait4ctx = (struct wait4context *) ctx;
35
  tout = *wait4ctx->tout;
36
  status = wait4data(wait4ctx->node, &tout, wait4ctx->user_fds);
37
  if (status == 1){
38
    pthread_mutex_lock(&wait4ctx->wait_mutex);
39
    wait4ctx->source = DATA_SOURCE_NET;
40
    wait4ctx->status = status;
41
    pthread_cond_signal(&wait4ctx->wait_cond);
42
    pthread_mutex_unlock(&wait4ctx->wait_mutex);
43
  }
44
  pthread_exit(NULL);
45
}
46

    
47
static void* wait4cloud_wrapper(void *ctx)
48
{
49
  struct wait4context *wait4ctx;
50
  struct timeval tout;
51
  int status;
52

    
53
  wait4ctx = (struct wait4context *) ctx;
54
  tout = *wait4ctx->tout;
55
  status = wait4cloud(wait4ctx->cloud, &tout);
56
  if (status == 1 || status == -1) {
57
    pthread_mutex_lock(&wait4ctx->wait_mutex);
58
    wait4ctx->source = DATA_SOURCE_CLOUD;
59
    wait4ctx->status = status;
60
    pthread_cond_signal(&wait4ctx->wait_cond);
61
    pthread_mutex_unlock(&wait4ctx->wait_mutex);
62
  }
63
  pthread_exit(NULL);
64
}
65

    
66

    
67
int wait4any_threaded(struct nodeID *n, struct cloud_helper_context *cloud, struct timeval *tout, int *user_fds, int *data_source)
68
{
69
  pthread_attr_t attr;
70
  pthread_t wait4data_thread;
71
  pthread_t wait4cloud_thread;
72
  struct wait4context *wait4ctx;
73
  struct timespec timeout;
74
  struct timeval now;
75

    
76
  int err;
77
  int result;
78

    
79
  wait4ctx = malloc(sizeof(struct wait4context));
80
  if (wait4ctx == NULL) return -1;
81
  wait4ctx->node = n;
82
  wait4ctx->cloud = cloud;
83
  wait4ctx->tout = tout;
84
  wait4ctx->user_fds = user_fds;
85

    
86
  pthread_mutex_init(&wait4ctx->wait_mutex, NULL);
87
  pthread_cond_init (&wait4ctx->wait_cond, NULL);
88

    
89
  pthread_attr_init(&attr);
90
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
91

    
92
  pthread_mutex_lock(&wait4ctx->wait_mutex);
93
  pthread_create(&wait4data_thread, &attr, wait4data_wrapper, (void *)wait4ctx);
94
  pthread_create(&wait4cloud_thread, &attr, wait4cloud_wrapper, (void *)wait4ctx);
95

    
96
  gettimeofday(&now, NULL);
97
  timeout.tv_sec = now.tv_sec + tout->tv_sec;
98
  timeout.tv_nsec = now.tv_usec * 1000 + tout->tv_usec * 1000;
99

    
100
  // Wait for one of the thread to signal available data
101
  err = pthread_cond_timedwait(&wait4ctx->wait_cond, &wait4ctx->wait_mutex, &timeout);
102
  if (err ==  0) {
103
    *data_source = wait4ctx->source;
104
    result = 1;
105
  } else if(err == ETIMEDOUT){
106
    *data_source = DATA_SOURCE_NONE;
107
    result = 0;
108
  }
109

    
110
  // Clean up and return
111
  pthread_cancel(wait4data_thread);
112
  pthread_cancel(wait4cloud_thread);
113
  pthread_cond_destroy(&wait4ctx->wait_cond);
114
  pthread_mutex_unlock(&wait4ctx->wait_mutex);
115
  pthread_mutex_destroy(&wait4ctx->wait_mutex);
116

    
117
  return result;
118
}
119

    
120

    
121
/* Subtract the `struct timeval' values X and Y,
122
   storing the result in RESULT.
123
   Return 1 if the difference is negative, otherwise 0.  */
124

    
125
int timeval_subtract(struct timeval *result, struct timeval *x, struct timeval *y)
126
{
127
  /* Perform the carry for the later subtraction by updating y. */
128
  if (x->tv_usec < y->tv_usec) {
129
    int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1;
130
    y->tv_usec -= 1000000 * nsec;
131
    y->tv_sec += nsec;
132
  }
133
  if (x->tv_usec - y->tv_usec > 1000000) {
134
    int nsec = (x->tv_usec - y->tv_usec) / 1000000;
135
    y->tv_usec += 1000000 * nsec;
136
    y->tv_sec -= nsec;
137
  }
138

    
139
  /* Compute the time remaining to wait.
140
     tv_usec is certainly positive. */
141
  result->tv_sec = x->tv_sec - y->tv_sec;
142
  result->tv_usec = x->tv_usec - y->tv_usec;
143

    
144
  /* Return 1 if result is negative. */
145
  return x->tv_sec < y->tv_sec;
146
}
147

    
148

    
149
int wait4any_polling(struct nodeID *n, struct cloud_helper_context *cloud, struct timeval *tout, struct timeval *step_tout, int *user_fds, int *data_source)
150
{
151
  struct timeval timeout;
152
  struct timeval *step_time;
153
  uint8_t turn;
154
  int status;
155

    
156
  timeout = *tout;
157
  if (step_tout == NULL) {
158
    step_time = malloc(sizeof(struct timeval));
159
    step_time->tv_sec = 0;
160
    step_time->tv_usec = 100000;
161
  } else {
162
    *step_time = *step_tout;
163
  }
164

    
165
  turn = DATA_SOURCE_NET;
166
  while (turn != DATA_SOURCE_NONE) {
167
    // Try waiting for 100 milliseconds on one resource
168
    if (turn == DATA_SOURCE_NET)
169
      status =  wait4data(n, step_time, user_fds);
170
    else
171
      status = wait4cloud(cloud, step_time);
172

    
173
    if (status == 1){
174
      // If we got a positive response we're done
175
      *data_source = turn;
176
      free(step_time);
177
      return 1;
178
    } else {
179
      // Go on with another step
180
      turn = (turn == DATA_SOURCE_NET)? DATA_SOURCE_CLOUD:DATA_SOURCE_NET;
181
      if (step_tout == NULL) {
182
        step_time->tv_sec = 0;
183
        step_time->tv_usec = 100000;
184
      } else {
185
        *step_time = *step_tout;
186
      }
187

    
188
      // If we exeded the timeout it's time to stop
189
      if (timeval_subtract(&timeout, &timeout, step_time) < 0)
190
        turn = DATA_SOURCE_NONE;
191
    }
192
  }
193

    
194
  free(step_time);
195
  *data_source = DATA_SOURCE_NONE;
196
  return 0;
197
}