another technical blog...technically

Friday, September 11, 2020

Some t(r)ip with RabbitMQ

Recently, i made lots of experiments with RabbitMq with my friends and colleagues Alessandro Argiolas and Stefano Lazzati. We were interested in decoupling logic from ServiceNow to custom code, but, since we cannot leverage on powerful servers and we haven't permission to go to cloud, we were looking for a solution not to sugger of overflow.

We tried to use RabbitMQ, a lightweight message broker, as a workaround for this. The main idea is: if i have to ingest lot of data but i have low-performance hardware, can i do something? In our opinion, an option could be this one. If you don't want to install RabbitMQ you can use it as a service with CloudAMQP which is also fully documented, moreover there are tons of posts (written better than mine) that helps you to understand why is better to use a Message Broker and not a old-fashioned relational DB, what is an exhange, the routing modes and bla bla bla, so feel free to ask in the comments, but first read this.

So, about our architecture, consider those 4 application items:

  1. Source System: just the pre-existing system that need to call the web service to do something
  2. Ingestion Web Service: a really lightweight web service which just receive data and put it in the respective RabbitMQ queue. The message must be the most lightweight one
  3. RabbitMQ Queue: a queue defined on RabbitMQ, will contain the messages the engine need to work
  4. C# Engine: subscribed to the queue, as soon messages arrive ti work the case and return result to Source System and ACK to RabbitMQ. C# because we love C# but it could be whatever language

 


Ingestion Web Service 

Must be something easy peasy and lightweight: we written it in C# but maybe Go or other languages could be a better choice. This web services is your producer, it takes messages, it route message to the correct queue: in our scenario we realized queue for every customer, method and technology which is in the message request, so the routing key is generated from the info in the request.

The web service mainly contains a controller with this method

public class QueueController : ApiController
{
	private static readonly ILog _log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);

	private readonly IQueueProducer _producer;
	private readonly string _exchange;

	public QueueController(IQueueProducer producer)
	{
		_producer = producer;

		using (SynopsContext context = new SynopsContext())
		{
			_exchange = context.GetCommonConfig("rabbit")["exchange"];
		}
	}

	[HttpPost]
	public IHttpActionResult AddToQueue(QueueRequest request)
	{
		string routingKey = string.Format("{0}_{1}_{2}", request.Customer.ToLower(), request.Method.ToLower(), request.Technology.ToLower());

		try
		{
			string jsonContent = JsonConvert.SerializeObject(request.Arguments);
			byte[] data = Encoding.Default.GetBytes(jsonContent);

			_producer.BasicPublish(_exchange, routingKey, data);
			_producer.Dispose();
		}
		catch (Exception ex)
		{
			_log.Error(ex.Message);

			return InternalServerError(ex);
		}

		return Ok();
	}
}

as you can see nothing so complex, the producer here 

public class RabbitMQProducer : IQueueProducer
{
	private readonly IConnection _connection;
	private readonly IModel _channel;

	public RabbitMQProducer(string hostname, string username, string password)
	{
		ConnectionFactory connectionFactory = new ConnectionFactory
		{
			HostName = hostname,
			UserName = username,
			Password = password
		};

		_connection = connectionFactory.CreateConnection();
		_channel = _connection.CreateModel();
	}

	public void BasicPublish(string exchange, string routingKey, byte[] data)
	{
		IBasicProperties properties = _channel.CreateBasicProperties();
		properties.Persistent = true;

		_channel.BasicPublish(exchange, routingKey, properties, data);
	}

	public void Dispose()
	{
		_channel?.Dispose();
		_channel?.Close();
		_connection?.Dispose();
		_connection?.Close();
	}
}

C# Engine

Besides what's in RabbitMQ, you need a consumer to consume message. Fun fact, you can balance the load adding more than just one Engine because you are centralizing the queue on one machine. It's interesting how using prefecth count in QoS you can choose how many messages you want to process simultaneously. Instead to launch the correct code to process message depending on the functionality, we just used a strategy pattern and we simply launch a task for every message we want to work.

public class RabbitMQConsumer
{
	private readonly ILog _log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);

	private readonly IConnection _connection;
	private readonly IModel _channel;
	private readonly EventingBasicConsumer _consumer;

	public RabbitMQConsumer()
	{
		Dictionary rabbitConfig;

		using (SynopsContext context = new SynopsContext())
		{
			rabbitConfig = context.GetCommonConfig("rabbit");
		}

		rabbitConfig.Add("password", ConfigurationManager.AppSettings["rabbit_password"]);

		ConnectionFactory connectionFactory = new ConnectionFactory
		{
			HostName = rabbitConfig["hostname"],
			UserName = rabbitConfig["username"],
			Password = rabbitConfig["password"],
		};

		ushort prefetchCount = ushort.Parse(rabbitConfig["prefetch"]);

		_log.Info(string.Format("Number of parallel tasks for each queue: {0}", prefetchCount));

		_connection = connectionFactory.CreateConnection();
		_channel = _connection.CreateModel();
		_channel.BasicQos(0, prefetchCount, false);
		_consumer = new EventingBasicConsumer(_channel);
		_consumer.Received += BasicConsume;
	}

	public void BasicConsume(string queue, bool autoAck = false)
	{
		_channel.BasicConsume(queue, autoAck, _consumer);
	}

	private void BasicConsume(object sender, BasicDeliverEventArgs args)
	{
		Task.Run(() => BasicConsume(args.Exchange, args.RoutingKey, args.DeliveryTag, args.Body));
	}

	private void BasicConsume(string exchange, string routingKey, ulong deliveryTag, byte[] data)
	{
		try
		{
			Strategy strategy = new Strategy(routingKey);

			strategy.Run(data);
		}
		catch (Exception ex)
		{
			_log.Error(string.Format("Error during working exchange {0} with routing key {1} and delivery tag {2}. With exception:", exchange, routingKey, deliveryTag, ex));
		}
		finally
		{
			_channel.BasicAck(deliveryTag, false);
			_log.Info(string.Format("Ack {0}.", deliveryTag));
		}
	}
}

Lesson Learned

  • Use this technique when you can leverage on async approach 
  • Consider monitoring for RabbitMQ and Engine, it's a new piece of software
  • Messages must be REALLY lightweight, otherwise you will need a load balancer for the web service... and so you
written in: Milano MI, Italia

0 commenti:

Post a Comment

Because of a lot of SPAM about courses, I need to moderate all comments here.
I ensure you that I will answer whenever possible (if you are not a spammer).

Me, myself and I

My Photo
I'm just another IT guy sharing his knowledge with all of you out there.
Wanna know more?