Statistics
| Branch: | Revision:

grapes / src / Utils / request_handler.c @ 42cbd06c

History | View | Annotate | Download (9.01 KB)

1
/*
2
 *  Copyright (c) 2011 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 *
6
 */
7
#include <stdlib.h>
8
#include <stdio.h>
9
#include <unistd.h>
10
#include <pthread.h>
11
#include <semaphore.h>
12
#include <string.h>
13
#include <sys/time.h>
14

    
15
#include "request_handler.h"
16
#include "fifo_queue.h"
17

    
18

    
19
void* request_handler(void *data);
20

    
21

    
22
struct req_handler_ctx {
23
  /* request/response management */
24
  pthread_mutex_t req_queue_lock;
25
  fifo_queue_p req_queue;
26

    
27
  pthread_mutex_t rsp_queue_sync_mutex;
28
  pthread_cond_t rsp_queue_sync_cond;
29
  pthread_mutex_t rsp_queue_lock;
30
  fifo_queue_p rsp_queue;
31

    
32
  /* thread management */
33
  pthread_attr_t req_handler_thread_attr;
34
  pthread_cond_t req_handler_sync_cond;
35
  pthread_mutex_t req_handler_sync_mutex;
36
  pthread_t req_handler_thread;
37
};
38

    
39

    
40
typedef struct request {
41
  process_request_callback_p req_callback;
42
  free_request_callback_p free_callback;
43
  void *req_data;
44
} request_t;
45

    
46
/***********************************************************************
47
 * Initialization/destruction
48
 ***********************************************************************/
49
void req_handler_destroy(struct req_handler_ctx *ctx)
50
{
51
  if (!ctx) return;
52

    
53
  /* TODO: we should use a more specialized function instead of the
54
     standard free. But since the cloud_helper do not take into
55
     account deallocation the queue will always be empty when we
56
     enter this function */
57
  if (ctx->req_queue) fifo_queue_destroy(ctx->req_queue, NULL);
58
  if (ctx->rsp_queue) fifo_queue_destroy(ctx->rsp_queue, NULL);
59

    
60
  /* destroy mutexs, conds, threads, ...
61
     This should be safe as no mutex has already been locked and both
62
     mutex_destroy/cond_destroy check fail with EINVAL if the
63
     specified object is not initialized */
64
  pthread_mutex_destroy(&ctx->req_queue_lock);
65
  pthread_mutex_destroy(&ctx->rsp_queue_lock);
66
  pthread_mutex_destroy(&ctx->req_handler_sync_mutex);
67
  pthread_mutex_destroy(&ctx->rsp_queue_sync_mutex);
68

    
69
  pthread_cond_destroy(&ctx->rsp_queue_sync_cond);
70
  pthread_cond_destroy(&ctx->req_handler_sync_cond);
71

    
72
  pthread_attr_destroy(&ctx->req_handler_thread_attr);
73

    
74
  free(ctx);
75
  return;
76
}
77

    
78
/* Allocate and initialize needed structures */
79
struct req_handler_ctx* req_handler_init()
80
{
81
  struct req_handler_ctx *ctx;
82
  int err;
83

    
84
  ctx = malloc(sizeof(struct req_handler_ctx));
85
  memset(ctx, 0, sizeof(struct req_handler_ctx));
86

    
87
  ctx->req_queue = fifo_queue_create(10);
88
  if (!ctx->req_queue) {
89
    req_handler_destroy(ctx);
90
    return 0;
91
  }
92

    
93
  ctx->rsp_queue = fifo_queue_create(10);
94
  if (!ctx->rsp_queue) {
95
    req_handler_destroy(ctx);
96
    return 0;
97
  }
98

    
99

    
100
  err = pthread_mutex_init(&ctx->req_queue_lock, NULL);
101
  if (err) {
102
    req_handler_destroy(ctx);
103
    return 0;
104
  }
105

    
106
  err = pthread_mutex_init(&ctx->rsp_queue_lock, NULL);
107
  if (err) {
108
    req_handler_destroy(ctx);
109
    return 0;
110
  }
111

    
112
  err = pthread_mutex_init(&ctx->rsp_queue_sync_mutex, NULL);
113
  if (err) {
114
    req_handler_destroy(ctx);
115
    return 0;
116
  }
117

    
118
  err = pthread_cond_init (&ctx->rsp_queue_sync_cond, NULL);
119
  if (err) {
120
    req_handler_destroy(ctx);
121
    return 0;
122
  }
123

    
124
  err = pthread_mutex_init(&ctx->req_handler_sync_mutex, NULL);
125
  if (err) {
126
    req_handler_destroy(ctx);
127
    return 0;
128
  }
129

    
130
  err = pthread_cond_init (&ctx->req_handler_sync_cond, NULL);
131
  if (err) {
132
    req_handler_destroy(ctx);
133
    return 0;
134
  }
135

    
136
  err = pthread_attr_init(&ctx->req_handler_thread_attr);
137
  if (err) {
138
    req_handler_destroy(ctx);
139
    return 0;
140
  }
141

    
142
  err = pthread_attr_setdetachstate(&ctx->req_handler_thread_attr,
143
                                    PTHREAD_CREATE_JOINABLE);
144
  if (err) {
145
    req_handler_destroy(ctx);
146
    return 0;
147
  }
148

    
149
  err = pthread_create(&ctx->req_handler_thread,
150
                       &ctx->req_handler_thread_attr,
151
                       &request_handler,
152
                       (void *)ctx);
153
  if (err) {
154
    req_handler_destroy(ctx);
155
    return 0;
156
  }
157

    
158
  return ctx;
159
}
160

    
161
/***********************************************************************
162
 * Request management
163
 ***********************************************************************/
164
static void free_request(request_t *req)
165
{
166
  if (req->free_callback) req->free_callback(req->req_data);
167
  free(req);
168
}
169

    
170
int req_handler_add_request(struct req_handler_ctx *ctx,
171
                            process_request_callback_p req_callback,
172
                            void *req_data,
173
                            free_request_callback_p free_callback)
174
{
175
  request_t *request;
176
  int err;
177

    
178
  request = malloc(sizeof(request_t));
179
  if (!request) return 1;
180

    
181
  request->req_callback = req_callback;
182
  request->req_data = req_data;
183
  request->free_callback = free_callback;
184

    
185
  /* add the request to the pool */
186
  pthread_mutex_lock(&ctx->req_queue_lock);
187
  err = fifo_queue_add(ctx->req_queue, request);
188
  pthread_mutex_unlock(&ctx->req_queue_lock);
189

    
190
  if (err) {
191
    free (request);
192
    return 1;
193
  }
194

    
195
  /* notify request handler thread */
196
  pthread_mutex_lock(&ctx->req_handler_sync_mutex);
197
  pthread_cond_signal(&ctx->req_handler_sync_cond);
198
  pthread_mutex_unlock(&ctx->req_handler_sync_mutex);
199

    
200
  return 0;
201
}
202

    
203
/***********************************************************************
204
 * Response management
205
 ***********************************************************************/
206
void* req_handler_wait4response(struct req_handler_ctx *ctx,
207
                                struct timeval *tout)
208
{
209
  if (fifo_queue_size(ctx->rsp_queue) == 0) {
210
    /* if there's no data ready to process, let's wait */
211
    struct timespec timeout;
212
    struct timeval abs_tout;
213
    int err;
214

    
215
    gettimeofday(&abs_tout, NULL);
216
    abs_tout.tv_sec += tout->tv_sec;
217
    abs_tout.tv_usec += tout->tv_usec;
218

    
219
    timeout.tv_sec = abs_tout.tv_sec + (abs_tout.tv_usec / 1000000);
220
    timeout.tv_nsec = abs_tout.tv_usec % 1000000;
221

    
222

    
223
    pthread_mutex_lock(&ctx->rsp_queue_sync_mutex);
224
    /* make sure that no data came in the meanwhile */
225
    if (fifo_queue_size(ctx->rsp_queue) == 0) {
226
      err = pthread_cond_timedwait(&ctx->rsp_queue_sync_cond,
227
                                   &ctx->rsp_queue_sync_mutex,
228
                                   &timeout);
229
    }
230
    pthread_mutex_unlock(&ctx->rsp_queue_sync_mutex);
231
  }
232

    
233
  return req_handler_get_response(ctx);
234
}
235

    
236
static int add_response(struct req_handler_ctx *ctx, void *rsp)
237
{
238
  int err;
239
  /* add the response to the pool */
240
  pthread_mutex_lock(&ctx->rsp_queue_lock);
241
  err = fifo_queue_add(ctx->rsp_queue, rsp);
242
  pthread_mutex_unlock(&ctx->rsp_queue_lock);
243

    
244
  if (err) return 1;
245

    
246
  /* notify wait4response there's a response in the queue */
247
  pthread_mutex_lock(&ctx->rsp_queue_sync_mutex);
248
  pthread_cond_signal(&ctx->rsp_queue_sync_cond);
249
  pthread_mutex_unlock(&ctx->rsp_queue_sync_mutex);
250

    
251
  return 0;
252
}
253

    
254
void* req_handler_get_response(struct req_handler_ctx *ctx)
255
{
256
  if (fifo_queue_size(ctx->rsp_queue) > 0) {
257
    void *rsp;
258

    
259
    pthread_mutex_lock(&ctx->rsp_queue_lock);
260
    rsp = fifo_queue_get_head(ctx->rsp_queue);
261
    pthread_mutex_unlock(&ctx->rsp_queue_lock);
262
    return rsp;
263
  }
264

    
265
  return NULL;
266
}
267

    
268
void* req_handler_remove_response(struct req_handler_ctx *ctx)
269
{
270
  void *rsp;
271

    
272
  pthread_mutex_lock(&ctx->rsp_queue_lock);
273
  rsp = fifo_queue_remove_head(ctx->rsp_queue);
274
  pthread_mutex_unlock(&ctx->rsp_queue_lock);
275

    
276
  return rsp;
277
}
278

    
279

    
280
/***********************************************************************
281
 * Request handler implementation
282
 ***********************************************************************/
283
static int request_handler_process_requests(struct req_handler_ctx *ctx)
284
{
285
  request_t *req;
286
  int req_number;
287

    
288
  req_number = 0;
289
  do {
290
    pthread_mutex_lock(&ctx->req_queue_lock);
291
    req = fifo_queue_remove_head(ctx->req_queue);
292
    pthread_mutex_unlock(&ctx->req_queue_lock);
293

    
294
    if (req) {
295
      int status;
296
      void *rsp_data;
297

    
298
      req_number++;
299

    
300
      status = req->req_callback(req->req_data, &rsp_data);
301

    
302
      switch(status){
303
      case 0:
304
        /* request successful */
305
        if (rsp_data) {
306
          /* We have a response */
307
          status = add_response(ctx, rsp_data);
308
          if (status != 0) {
309
            fprintf(stderr, "req_handler: error adding response to queue\n");
310
          }
311
        }
312

    
313
        free_request(req);
314
        break;
315
      case 1:
316
        /* request to requeue */
317
        fifo_queue_add(ctx->req_queue, req);
318
        break;
319

    
320
      case -1:
321
        /* request aborted */
322
        free_request(req);
323
        break;
324

    
325
      default:
326
        fprintf(stderr,"req_handler: invalid return status from callback\n");
327
        free_request(req);
328
      }
329
    }
330
  } while(req != NULL);
331

    
332
  return req_number;
333
}
334

    
335
void* request_handler(void *data)
336
{
337
  struct req_handler_ctx *ctx;
338

    
339
  ctx = (struct req_handler_ctx *) data;
340
  do{
341
    /* wait for main thread to signal there's some work to do */
342
    pthread_mutex_lock(&ctx->req_handler_sync_mutex);
343

    
344
    /* make sure not to block if there's already something to handle */
345
    if (fifo_queue_size(ctx->req_queue) == 0) {
346
      pthread_cond_wait(&ctx->req_handler_sync_cond,
347
                        &ctx->req_handler_sync_mutex);
348
    }
349
    request_handler_process_requests(ctx);
350

    
351
    pthread_mutex_unlock(&ctx->req_handler_sync_mutex);
352
  } while(1); /* grapes TopologyManager don't support termination */
353
}