Revision b9d3ca04 transports/janus_rabbitmq.c

View differences:

transports/janus_rabbitmq.c
107 107
static gboolean rmq_admin_api_enabled = FALSE;
108 108
static gboolean notify_events = TRUE;
109 109

  
110
/* FIXME: Should it be configable? */
111
#define JANUS_RABBITMQ_EXCHANGE_TYPE "fanout"
112

  
110 113
/* JSON serialization options */
111 114
static size_t json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
112 115

  
......
116 119
	amqp_connection_state_t rmq_conn;		/* AMQP connection state */
117 120
	amqp_channel_t rmq_channel;				/* AMQP channel */
118 121
	gboolean janus_api_enabled;				/* Whether the Janus API via RabbitMQ is enabled */
122
	amqp_bytes_t janus_exchange;			/* AMQP exchange for outgoing messages */
119 123
	amqp_bytes_t to_janus_queue;			/* AMQP outgoing messages queue (Janus API) */
120 124
	amqp_bytes_t from_janus_queue;			/* AMQP incoming messages queue (Janus API) */
121 125
	gboolean admin_api_enabled;				/* Whether the Janus API via RabbitMQ is enabled */
......
255 259
	/* Now check if the Janus API must be supported */
256 260
	const char *to_janus = NULL, *from_janus = NULL;
257 261
	const char *to_janus_admin = NULL, *from_janus_admin = NULL;
262
	const char *janus_exchange = NULL;
258 263
	item = janus_config_get_item_drilldown(config, "general", "enable");
259 264
	if(!item || !item->value || !janus_is_true(item->value)) {
260 265
		JANUS_LOG(LOG_WARN, "RabbitMQ support disabled (Janus API)\n");
......
272 277
			goto error;
273 278
		}
274 279
		from_janus = g_strdup(item->value);
275
		JANUS_LOG(LOG_INFO, "RabbitMQ support for Janus API enabled, %s:%d (%s/%s)\n", rmqhost, rmqport, to_janus, from_janus);
280
		item = janus_config_get_item_drilldown(config, "general", "janus_exchange");
281
		if(!item || !item->value) {
282
			JANUS_LOG(LOG_INFO, "Missing name of outgoing exchange for RabbitMQ integration, using default\n");
283
		} else {
284
			janus_exchange = g_strdup(item->value);
285
		}
286
		if (janus_exchange == NULL) {
287
			JANUS_LOG(LOG_INFO, "RabbitMQ support for Janus API enabled, %s:%d (%s/%s)\n", rmqhost, rmqport, to_janus, from_janus);
288
		} else {
289
			JANUS_LOG(LOG_INFO, "RabbitMQ support for Janus API enabled, %s:%d (%s/%s) exch: (%s)\n", rmqhost, rmqport, to_janus, from_janus, janus_exchange);
290
		}
276 291
		rmq_janus_api_enabled = TRUE;
277 292
	}
278 293
	/* Do the same for the admin API */
......
368 383
			JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error opening channel... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
369 384
			goto error;
370 385
		}
386
		rmq_client->janus_exchange = amqp_empty_bytes;
387
		if(janus_exchange != NULL) {
388
			JANUS_LOG(LOG_VERB, "Declaring exchange...\n");
389
			rmq_client->janus_exchange = amqp_cstring_bytes(janus_exchange);
390
			amqp_exchange_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->janus_exchange, amqp_cstring_bytes(JANUS_RABBITMQ_EXCHANGE_TYPE), 0, 0, 0, 0, amqp_empty_table);
391
			result = amqp_get_rpc_reply(rmq_client->rmq_conn);
392
			if(result.reply_type != AMQP_RESPONSE_NORMAL) {
393
				JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error diclaring exchange... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
394
				goto error;
395
			}
396
		}
371 397
		rmq_client->janus_api_enabled = FALSE;
372 398
		if(rmq_janus_api_enabled) {
373 399
			rmq_client->janus_api_enabled = TRUE;
......
470 496
		g_free((char *)username);
471 497
	if(password)
472 498
		g_free((char *)password);
499
	if(janus_exchange)
500
		g_free((char *)janus_exchange);
473 501
	if(to_janus)
474 502
		g_free((char *)to_janus);
475 503
	if(from_janus)
......
514 542
			g_free((char *)rmq_client->to_janus_admin_queue.bytes);
515 543
		if(rmq_client->from_janus_admin_queue.bytes)
516 544
			g_free((char *)rmq_client->from_janus_admin_queue.bytes);
545
		if(rmq_client->janus_exchange.bytes)
546
			g_free((char *)rmq_client->janus_exchange.bytes);
517 547
	}
518 548
	g_free(rmq_client);
519 549

  
......
713 743
			props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
714 744
			props.content_type = amqp_cstring_bytes("application/json");
715 745
			amqp_bytes_t message = amqp_cstring_bytes(payload_text);
716
			int status = amqp_basic_publish(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_empty_bytes,
746
			int status = amqp_basic_publish(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->janus_exchange,
717 747
				response->admin ? rmq_client->from_janus_admin_queue : rmq_client->from_janus_queue,
718 748
				0, 0, &props, message);
719 749
			if(status != AMQP_STATUS_OK) {

Also available in: Unified diff