Revision a1ea0562 plugins/janus_echotest.c

View differences:

plugins/janus_echotest.c
162 162
/* Useful stuff */
163 163
static volatile gint initialized = 0, stopping = 0;
164 164
static janus_callbacks *gateway = NULL;
165
static GThread *handler_thread;
166 165
static GThread *watchdog;
166

  
167
static janus_mutex handler_mutex;
168
static janus_condition handler_cond;
169
static GThread *handler_thread;
167 170
static void *janus_echotest_handler(void *data);
168 171

  
172

  
169 173
typedef struct janus_echotest_message {
170 174
	janus_plugin_session *handle;
171 175
	char *transaction;
......
173 177
	char *sdp_type;
174 178
	char *sdp;
175 179
} janus_echotest_message;
176
static GAsyncQueue *messages = NULL;
180
static GQueue *messages = NULL;
177 181

  
178 182
typedef struct janus_echotest_session {
179 183
	janus_plugin_session *handle;
......
282 286
	
283 287
	sessions = g_hash_table_new(NULL, NULL);
284 288
	janus_mutex_init(&sessions_mutex);
285
	messages = g_async_queue_new_full((GDestroyNotify) janus_echotest_message_free);
289
	messages = g_queue_new();
286 290
	/* This is the callback we'll need to invoke to contact the gateway */
287 291
	gateway = callback;
288 292
	g_atomic_int_set(&initialized, 1);
......
296 300
		return -1;
297 301
	}
298 302
	/* Launch the thread that will handle incoming messages */
303
	janus_mutex_init(&handler_mutex);
304
	janus_condition_init(&handler_cond);
299 305
	handler_thread = g_thread_try_new("janus echotest handler", janus_echotest_handler, NULL, &error);
300 306
	if(error != NULL) {
301 307
		g_atomic_int_set(&initialized, 0);
......
311 317
		return;
312 318
	g_atomic_int_set(&stopping, 1);
313 319

  
320
	janus_mutex_lock(&handler_mutex);
321
	janus_condition_signal(&handler_cond);
322
	janus_mutex_unlock(&handler_mutex);
314 323
	if(handler_thread != NULL) {
315 324
		g_thread_join(handler_thread);
316 325
		handler_thread = NULL;
......
324 333
	janus_mutex_lock(&sessions_mutex);
325 334
	g_hash_table_destroy(sessions);
326 335
	janus_mutex_unlock(&sessions_mutex);
327
	g_async_queue_unref(messages);
336
	g_queue_free_full(messages, (GDestroyNotify) janus_echotest_message_free);
328 337
	messages = NULL;
329 338
	sessions = NULL;
330 339

  
......
454 463
	msg->message = message;
455 464
	msg->sdp_type = sdp_type;
456 465
	msg->sdp = sdp;
457
	g_async_queue_push(messages, msg);
466
	janus_mutex_lock(&handler_mutex);
467
	g_queue_push_tail(messages, msg);
468
	janus_condition_signal(&handler_cond);
469
	janus_mutex_unlock(&handler_mutex);
458 470

  
459 471
	/* All the requests to this plugin are handled asynchronously */
460 472
	return janus_plugin_result_new(JANUS_PLUGIN_OK_WAIT, "I'm taking my time!");
......
664 676
	}
665 677
	json_t *root = NULL;
666 678
	while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
667
		if(!messages || (msg = g_async_queue_try_pop(messages)) == NULL) {
668
			usleep(50000);
669
			continue;
679
		janus_mutex_lock(&handler_mutex);
680
		if(!messages || (msg = g_queue_pop_head(messages)) == NULL) {
681
			/* Wait for a new message to be queued */
682
			janus_condition_wait(&handler_cond, &handler_mutex);
683
			msg = g_queue_pop_head(messages);
684
			if(msg == NULL) {
685
				janus_mutex_unlock(&handler_mutex);
686
				continue;
687
			}
670 688
		}
689
		janus_mutex_unlock(&handler_mutex);
671 690
		janus_echotest_session *session = NULL;
672 691
		janus_mutex_lock(&sessions_mutex);
673 692
		if(g_hash_table_lookup(sessions, msg->handle) != NULL ) {

Also available in: Unified diff