Revision 42cbd06c

View differences:

src/Utils/Makefile
5 5
endif
6 6
CFGDIR ?= ..
7 7

  
8
OBJS = fifo_queue.o
8
OBJS = fifo_queue.o request_handler.o
9 9

  
10 10
include $(BASE)/src/utils.mak
src/Utils/request_handler.c
1
/*
2
 *  Copyright (c) 2011 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 *
6
 */
7
#include <stdlib.h>
8
#include <stdio.h>
9
#include <unistd.h>
10
#include <pthread.h>
11
#include <semaphore.h>
12
#include <string.h>
13
#include <sys/time.h>
14

  
15
#include "request_handler.h"
16
#include "fifo_queue.h"
17

  
18

  
19
void* request_handler(void *data);
20

  
21

  
22
struct req_handler_ctx {
23
  /* request/response management */
24
  pthread_mutex_t req_queue_lock;
25
  fifo_queue_p req_queue;
26

  
27
  pthread_mutex_t rsp_queue_sync_mutex;
28
  pthread_cond_t rsp_queue_sync_cond;
29
  pthread_mutex_t rsp_queue_lock;
30
  fifo_queue_p rsp_queue;
31

  
32
  /* thread management */
33
  pthread_attr_t req_handler_thread_attr;
34
  pthread_cond_t req_handler_sync_cond;
35
  pthread_mutex_t req_handler_sync_mutex;
36
  pthread_t req_handler_thread;
37
};
38

  
39

  
40
typedef struct request {
41
  process_request_callback_p req_callback;
42
  free_request_callback_p free_callback;
43
  void *req_data;
44
} request_t;
45

  
46
/***********************************************************************
47
 * Initialization/destruction
48
 ***********************************************************************/
49
void req_handler_destroy(struct req_handler_ctx *ctx)
50
{
51
  if (!ctx) return;
52

  
53
  /* TODO: we should use a more specialized function instead of the
54
     standard free. But since the cloud_helper do not take into
55
     account deallocation the queue will always be empty when we
56
     enter this function */
57
  if (ctx->req_queue) fifo_queue_destroy(ctx->req_queue, NULL);
58
  if (ctx->rsp_queue) fifo_queue_destroy(ctx->rsp_queue, NULL);
59

  
60
  /* destroy mutexs, conds, threads, ...
61
     This should be safe as no mutex has already been locked and both
62
     mutex_destroy/cond_destroy check fail with EINVAL if the
63
     specified object is not initialized */
64
  pthread_mutex_destroy(&ctx->req_queue_lock);
65
  pthread_mutex_destroy(&ctx->rsp_queue_lock);
66
  pthread_mutex_destroy(&ctx->req_handler_sync_mutex);
67
  pthread_mutex_destroy(&ctx->rsp_queue_sync_mutex);
68

  
69
  pthread_cond_destroy(&ctx->rsp_queue_sync_cond);
70
  pthread_cond_destroy(&ctx->req_handler_sync_cond);
71

  
72
  pthread_attr_destroy(&ctx->req_handler_thread_attr);
73

  
74
  free(ctx);
75
  return;
76
}
77

  
78
/* Allocate and initialize needed structures */
79
struct req_handler_ctx* req_handler_init()
80
{
81
  struct req_handler_ctx *ctx;
82
  int err;
83

  
84
  ctx = malloc(sizeof(struct req_handler_ctx));
85
  memset(ctx, 0, sizeof(struct req_handler_ctx));
86

  
87
  ctx->req_queue = fifo_queue_create(10);
88
  if (!ctx->req_queue) {
89
    req_handler_destroy(ctx);
90
    return 0;
91
  }
92

  
93
  ctx->rsp_queue = fifo_queue_create(10);
94
  if (!ctx->rsp_queue) {
95
    req_handler_destroy(ctx);
96
    return 0;
97
  }
98

  
99

  
100
  err = pthread_mutex_init(&ctx->req_queue_lock, NULL);
101
  if (err) {
102
    req_handler_destroy(ctx);
103
    return 0;
104
  }
105

  
106
  err = pthread_mutex_init(&ctx->rsp_queue_lock, NULL);
107
  if (err) {
108
    req_handler_destroy(ctx);
109
    return 0;
110
  }
111

  
112
  err = pthread_mutex_init(&ctx->rsp_queue_sync_mutex, NULL);
113
  if (err) {
114
    req_handler_destroy(ctx);
115
    return 0;
116
  }
117

  
118
  err = pthread_cond_init (&ctx->rsp_queue_sync_cond, NULL);
119
  if (err) {
120
    req_handler_destroy(ctx);
121
    return 0;
122
  }
123

  
124
  err = pthread_mutex_init(&ctx->req_handler_sync_mutex, NULL);
125
  if (err) {
126
    req_handler_destroy(ctx);
127
    return 0;
128
  }
129

  
130
  err = pthread_cond_init (&ctx->req_handler_sync_cond, NULL);
131
  if (err) {
132
    req_handler_destroy(ctx);
133
    return 0;
134
  }
135

  
136
  err = pthread_attr_init(&ctx->req_handler_thread_attr);
137
  if (err) {
138
    req_handler_destroy(ctx);
139
    return 0;
140
  }
141

  
142
  err = pthread_attr_setdetachstate(&ctx->req_handler_thread_attr,
143
                                    PTHREAD_CREATE_JOINABLE);
144
  if (err) {
145
    req_handler_destroy(ctx);
146
    return 0;
147
  }
148

  
149
  err = pthread_create(&ctx->req_handler_thread,
150
                       &ctx->req_handler_thread_attr,
151
                       &request_handler,
152
                       (void *)ctx);
153
  if (err) {
154
    req_handler_destroy(ctx);
155
    return 0;
156
  }
157

  
158
  return ctx;
159
}
160

  
161
/***********************************************************************
162
 * Request management
163
 ***********************************************************************/
164
static void free_request(request_t *req)
165
{
166
  if (req->free_callback) req->free_callback(req->req_data);
167
  free(req);
168
}
169

  
170
int req_handler_add_request(struct req_handler_ctx *ctx,
171
                            process_request_callback_p req_callback,
172
                            void *req_data,
173
                            free_request_callback_p free_callback)
174
{
175
  request_t *request;
176
  int err;
177

  
178
  request = malloc(sizeof(request_t));
179
  if (!request) return 1;
180

  
181
  request->req_callback = req_callback;
182
  request->req_data = req_data;
183
  request->free_callback = free_callback;
184

  
185
  /* add the request to the pool */
186
  pthread_mutex_lock(&ctx->req_queue_lock);
187
  err = fifo_queue_add(ctx->req_queue, request);
188
  pthread_mutex_unlock(&ctx->req_queue_lock);
189

  
190
  if (err) {
191
    free (request);
192
    return 1;
193
  }
194

  
195
  /* notify request handler thread */
196
  pthread_mutex_lock(&ctx->req_handler_sync_mutex);
197
  pthread_cond_signal(&ctx->req_handler_sync_cond);
198
  pthread_mutex_unlock(&ctx->req_handler_sync_mutex);
199

  
200
  return 0;
201
}
202

  
203
/***********************************************************************
204
 * Response management
205
 ***********************************************************************/
206
void* req_handler_wait4response(struct req_handler_ctx *ctx,
207
                                struct timeval *tout)
208
{
209
  if (fifo_queue_size(ctx->rsp_queue) == 0) {
210
    /* if there's no data ready to process, let's wait */
211
    struct timespec timeout;
212
    struct timeval abs_tout;
213
    int err;
214

  
215
    gettimeofday(&abs_tout, NULL);
216
    abs_tout.tv_sec += tout->tv_sec;
217
    abs_tout.tv_usec += tout->tv_usec;
218

  
219
    timeout.tv_sec = abs_tout.tv_sec + (abs_tout.tv_usec / 1000000);
220
    timeout.tv_nsec = abs_tout.tv_usec % 1000000;
221

  
222

  
223
    pthread_mutex_lock(&ctx->rsp_queue_sync_mutex);
224
    /* make sure that no data came in the meanwhile */
225
    if (fifo_queue_size(ctx->rsp_queue) == 0) {
226
      err = pthread_cond_timedwait(&ctx->rsp_queue_sync_cond,
227
                                   &ctx->rsp_queue_sync_mutex,
228
                                   &timeout);
229
    }
230
    pthread_mutex_unlock(&ctx->rsp_queue_sync_mutex);
231
  }
232

  
233
  return req_handler_get_response(ctx);
234
}
235

  
236
static int add_response(struct req_handler_ctx *ctx, void *rsp)
237
{
238
  int err;
239
  /* add the response to the pool */
240
  pthread_mutex_lock(&ctx->rsp_queue_lock);
241
  err = fifo_queue_add(ctx->rsp_queue, rsp);
242
  pthread_mutex_unlock(&ctx->rsp_queue_lock);
243

  
244
  if (err) return 1;
245

  
246
  /* notify wait4response there's a response in the queue */
247
  pthread_mutex_lock(&ctx->rsp_queue_sync_mutex);
248
  pthread_cond_signal(&ctx->rsp_queue_sync_cond);
249
  pthread_mutex_unlock(&ctx->rsp_queue_sync_mutex);
250

  
251
  return 0;
252
}
253

  
254
void* req_handler_get_response(struct req_handler_ctx *ctx)
255
{
256
  if (fifo_queue_size(ctx->rsp_queue) > 0) {
257
    void *rsp;
258

  
259
    pthread_mutex_lock(&ctx->rsp_queue_lock);
260
    rsp = fifo_queue_get_head(ctx->rsp_queue);
261
    pthread_mutex_unlock(&ctx->rsp_queue_lock);
262
    return rsp;
263
  }
264

  
265
  return NULL;
266
}
267

  
268
void* req_handler_remove_response(struct req_handler_ctx *ctx)
269
{
270
  void *rsp;
271

  
272
  pthread_mutex_lock(&ctx->rsp_queue_lock);
273
  rsp = fifo_queue_remove_head(ctx->rsp_queue);
274
  pthread_mutex_unlock(&ctx->rsp_queue_lock);
275

  
276
  return rsp;
277
}
278

  
279

  
280
/***********************************************************************
281
 * Request handler implementation
282
 ***********************************************************************/
283
static int request_handler_process_requests(struct req_handler_ctx *ctx)
284
{
285
  request_t *req;
286
  int req_number;
287

  
288
  req_number = 0;
289
  do {
290
    pthread_mutex_lock(&ctx->req_queue_lock);
291
    req = fifo_queue_remove_head(ctx->req_queue);
292
    pthread_mutex_unlock(&ctx->req_queue_lock);
293

  
294
    if (req) {
295
      int status;
296
      void *rsp_data;
297

  
298
      req_number++;
299

  
300
      status = req->req_callback(req->req_data, &rsp_data);
301

  
302
      switch(status){
303
      case 0:
304
        /* request successful */
305
        if (rsp_data) {
306
          /* We have a response */
307
          status = add_response(ctx, rsp_data);
308
          if (status != 0) {
309
            fprintf(stderr, "req_handler: error adding response to queue\n");
310
          }
311
        }
312

  
313
        free_request(req);
314
        break;
315
      case 1:
316
        /* request to requeue */
317
        fifo_queue_add(ctx->req_queue, req);
318
        break;
319

  
320
      case -1:
321
        /* request aborted */
322
        free_request(req);
323
        break;
324

  
325
      default:
326
        fprintf(stderr,"req_handler: invalid return status from callback\n");
327
        free_request(req);
328
      }
329
    }
330
  } while(req != NULL);
331

  
332
  return req_number;
333
}
334

  
335
void* request_handler(void *data)
336
{
337
  struct req_handler_ctx *ctx;
338

  
339
  ctx = (struct req_handler_ctx *) data;
340
  do{
341
    /* wait for main thread to signal there's some work to do */
342
    pthread_mutex_lock(&ctx->req_handler_sync_mutex);
343

  
344
    /* make sure not to block if there's already something to handle */
345
    if (fifo_queue_size(ctx->req_queue) == 0) {
346
      pthread_cond_wait(&ctx->req_handler_sync_cond,
347
                        &ctx->req_handler_sync_mutex);
348
    }
349
    request_handler_process_requests(ctx);
350

  
351
    pthread_mutex_unlock(&ctx->req_handler_sync_mutex);
352
  } while(1); /* grapes TopologyManager don't support termination */
353
}
src/Utils/request_handler.h
1
/*
2
 *  Copyright (c) 2011 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 *
6
 *  This module provide an easy way to perform request and receive response in an
7
 *  asynchronous way using blocking functions. Request and response are kept in a
8
 *  dedicated FIFO queue.
9
 */
10

  
11
#ifndef CLOUD_HELPER_DELEGATE_UTILS_H
12
#define CLOUD_HELPER_DELEGATE_UTILS_H
13

  
14

  
15

  
16
struct req_handler_ctx;
17

  
18
/* Callback called to process the req specified by req_data. Should return 0
19
   on success, 1 to requeue the request (as last element), -1 to abort the
20
   request. On success the request handler should allocate and populate the
21
   rsp_data pointer with the context of the response. If no response is
22
   needed, rsp_data should be assigned NULL. */
23
typedef int (*process_request_callback_p)(void *req_data, void **rsp_data);
24

  
25
typedef void (*free_request_callback_p)(void *req_data);
26

  
27
/* Initialize the data structures and threads and return a pointer to the
28
   cloud_req_handler context */
29
struct req_handler_ctx* req_handler_init();
30

  
31
/* Release the resource acquired  by init */
32
void req_handler_destroy(struct req_handler_ctx *ctx);
33

  
34
/* Add a request to the queue. Such a request will be handled by the function
35
   pointed by req_callback and freed by the function pointed by free_req_data.
36
   Return 0 on success, 1 on failure */
37
int req_handler_add_request(struct req_handler_ctx *ctx,
38
                            process_request_callback_p req_callback,
39
                            void *req_data,
40
                            free_request_callback_p free_req_data);
41

  
42
/* Wait for a response for at most tout */
43
void* req_handler_wait4response(struct req_handler_ctx *ctx,
44
                                struct timeval *tout);
45

  
46
/* Return the first response in the queue or NULL if none is ready */
47
void* req_handler_get_response(struct req_handler_ctx *ctx);
48

  
49
/* Return and remove the first response in the queue or NULL if none is ready*/
50
void* req_handler_remove_response(struct req_handler_ctx *ctx);
51

  
52
#endif /* CLOUD_HELPER_DELEGATE_UTILS_H */

Also available in: Unified diff