Jason Rowe

Be curious! Choose your own adventure.

EasyNetQ Error Conditions

When using EasyNetQ client for RabbitMQ, there are some error conditions you might want to consider. This post will show some examples of ways you can customize the error handling when doing publishing and subscribing. For a good overview you can view the EasyNetQ Error Conditions docs on this subject.

Note:
This is just example code for looking at error conditions. Production could should use only one instance of IBus as a singleton.

Publishing Errors

Lost messages
The first thing I’ll mention is not really an error. “If you start publishing and no subscribers have ever been started then your messages simply disappear.” If someone publishes to an exchange without any queue bindings that message goes away by design. This isn’t as bad as it sounds. After the subscribing app runs once all the queues and binding will be setup and saved in RabbitMQ. EasyNetQ makes exchanges and queues durable (saved to disk) by default (see advanced API docs for more details). So losing message can easily be avoided by one time setup or even creating your own tracking queue during publish. You also have the option of making a message mandatory. If a message is mandatory EasyNetQ will send an event to messageReturned for you to handle. This can be setup using AdvancedBusEventHandlers.

RabbitMQ Unavailable
I’ll use a console app to see the behavior of EasyNetQ client when it is unable to connect to RabbitMQ.

dotnet new console -n RabbitMQPublishTimeout
dotnet new sln -n RabbitMQErrors
dotnet sln RabbitMQErrors.sln add **/*.csproj

Then install EasyNetQ nuget package.
I’ll use Visual Studio package manager console (Tools > Library Package Manager > Package Manager Console).
Install-Package EasyNetQ

Here is some code to test and see EasyNetQ output during a connection failure.

using System;
using EasyNetQ;
using EasyNetQ.Loggers;

namespace RabbitMQErrors
{
	class Program
	{
		static void Main(string[] args)
		{
			var consoleLogger = new ConsoleLogger();
			var bus = RabbitHutch.CreateBus($"host=noworkie", x => x.Register<IEasyNetQLogger>(_ => consoleLogger));
			var source = bus.Advanced.ExchangeDeclare($"boom", "direct");
			bus.Advanced.Publish(source, "", false, new Message<MyMessage>(new MyMessage { IsBroken = true }));
		}

		public class MyMessage
		{
			public bool IsBroken { get; set; }
		}
	}
}

When you run this EasyNetQ will retry until a configured timeout happens (default 10 seconds).

DEBUG: Trying to connect
ERROR: Failed to connect to Broker: 'noworkie', Port: 5672 VHost: '/'. ExceptionMessage: 'None of the specified endpoints were reachable (Connection failed)'
ERROR: Failed to connect to any Broker. Retrying in 00:00:05
DEBUG: Trying to connect
ERROR: Failed to connect to Broker: 'noworkie', Port: 5672 VHost: '/'. ExceptionMessage: 'None of the specified endpoints were reachable (Connection failed)'
ERROR: Failed to connect to any Broker. Retrying in 00:00:05
DEBUG: Trying to connect
ERROR: Channel action timed out. Throwing exception to client.

In the EasyNetQ, the default configuration ConnectionConfiguration.cs is 10 second timeout. Via the docs, Connecting-to-RabbitMQ you can configure these connection settings. “The connection string is made up of key/value pairs in the format key=value, each one separated by a semicolon (;).”

So if we wanted to change the timeout you can update via the connection string.

var bus = RabbitHutch.CreateBus($"host=noworkie;timeout=60");

If you want to change the retry interval or do other more advanced connection setting you would need to implement your own IPersistentConnection and replace the default connection factory. This unit test might help you get started down that path if you want to look into it.

It appears if you are using the RPC pattern you can implement your own ITimeoutStrategy or use [TimeoutSeconds(90)] attribute on your message classes.

If you are concerned about RabbitMQ availability in your environment you could implement your own local storage to backup the messages and retry later. Below is a basic example to show the concept.

			var message = new Message<MyMessage>(new MyMessage { IsBroken = true });
			var eventSource = "boom";
			var exchangeType = "direct";

			try
			{
				var consoleLogger = new ConsoleLogger();
				var bus = RabbitHutch.CreateBus($"host=rabbitserver", x => x.Register<IEasyNetQLogger>(_ => consoleLogger));
				var source = bus.Advanced.ExchangeDeclare(eventSource, exchangeType);
				bus.Advanced.Publish(source, "", false, message);
			}
			catch(Exception e)
			{
				repo.InsertMessageError(message, eventSource, exchangeType);
				throw e;
			}

Another setting you might be interested in when doing publishing is Publisher Confirms which not on by default when using EasyNetQ.

bus = RabbitHutch.CreateBus("host=localhost;publisherConfirms=true;timeout=10");

Subscription Callback Errors

According to the docs, If your subscription callback throws an exception, EasyNetQ will take the message that was being consumed and wrap it in a special error message. The error message will be published to the EasyNetQ error queue (named EasyNetQ_Default_Error_Queue).

You can change the default error queue name by doing the following. This is helpful if you have many apps on the same server and want handle your errors independently.

var bus = RabbitHutch.CreateBus("host=localhost");
bus.Advanced.Container.Resolve<IConventions>().ErrorQueueNamingConvention = () => "MyErrorQueueNaming";

If you would like to retry these errors you can manually shovel these messages into a queue that is setup to consume EasyNetQ error messages and retry them. Here is an example of setting up a consumer for an error queue.

			var subscriptionErrorQueue = bus.Advanced.QueueDeclare(errorQueue);
			var source = bus.Advanced.ExchangeDeclare(errorQueue, "direct");
			bus.Advanced.Bind(source, subscriptionErrorQueue, "#");
			bus.Advanced.Consume<Error>(subscriptionErrorQueue, (error, info) => this.errorQueueProcessor.ProcessMessage(error, info, bus));

Then to process the messages you would need to use the IMessage body and basic properties to retry your message.
The following is some example code of how you might start building out code to process these messages.

public void ProcessMessage(IMessage<Error> msg, MessageReceivedInfo info, IRetryPublishBus bus)
		{
			try
			{
				Type originalMsgType = null;

				if (msg.Body.BasicProperties.TypePresent)
				{
					var typeNameSerializer = bus.Advanced.Container.Resolve<ITypeNameSerializer>();
					originalMsgType = typeNameSerializer.DeSerialize(msg.Body.BasicProperties.Type);
				}
				else
				{
					originalMsgType = // TODO find type via exchange as it wasn't in the message.
				}

				dynamic originalMessage = JsonConvert.DeserializeObject(msg.Body.Message, originalMsgType);
				// TODO add retry logic here.
			}
			catch (Exception ex)
			{
				this.logger.LogCritical("Error processing error queue message", ex);
				this.WriteLostMessageToLogs(msg);
			}
		}

		private void WriteLostMessageToLogs(IMessage<Error> msg)
		{
			this.logger.LogCritical("Unable to process message", msg);
		}


Posted

in

by

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *