Azure queue with strongly typed messages
I've been reading a lot about cloud patterns recently (e.g. from microsoft) and one thing I've really wanted to do is move background processing to a worker role. To get started I decided to move our recurring jobs that we were running in the web role to a worker role because there are some limitations that are hard to avoid when running background work in ASP.NET.
Then when I queue a message I serialize an object to JSON with JSON.Net and create an instance of the CustomMessage class to put on the Queue. I actually have a service which maps from the CustomMessage to a CloudQueueMessage and vice versa so the whole process looks something like this:
The idea was simple - set up an Azure scheduler to put items on an Azure Storage Queue and have a worker role polling the queue and reacting to the messages. For simplicity, I created a simple wrapper for the queue messages
public class CustomMessage { public string TypeName { get; set; } public string Payload { get; set; } }
Then when I queue a message I serialize an object to JSON with JSON.Net and create an instance of the CustomMessage class to put on the Queue. I actually have a service which maps from the CustomMessage to a CloudQueueMessage and vice versa so the whole process looks something like this:
var someWorkItem = ...;
var customMessage = new CustomMessage
{
TypeName = someWorkItem.GetType().AssemblyQualifiedName,
Payload = JsonConvert.SerializeObject(someWorkItem)
};
var queueMessage = CustomMessageMapper.MapToCloudMessage(customMessage);
Queue.AddMessage(queueMessage);
Although the real app is a bit more complicated and doesn't pass around CloudQueueMessages (if it did, my core domain would be tied to Azure Storage Queues). In the application all of the scheduled tasks implement an interface IWorkItem to indicate that they can be run by the worker.
For each Class implementing IWorkItem there is required to be exactly one type implementing IWorkItemHandler<T> where T is the specific work item.
public interface IWorkItemHandler<in T> where T : IWorkItem
{
void Handle(T item);
}
Then the worker role's job is really simple:
- Get a message off the queue
- Deserialize the Payload to an IWorkItem
- Find the handler for the specific work item
- Call Handle on the Handler
// 1 not shown
// 2
var workItem = workItemService.DeserializePayload(value);
// 3
var workItemType = workItem.GetType();
var targetHandlerType = typeof(IWorkItemHandler<>).MakeGenericType(workItemType);
var handler = ObjectFactory.GetInstance(targetHandlerType);
// 4
handler
.GetType()
.GetMethod("Handle")
.Invoke(handler, new object[] { workItem });
ObjectFactory is from StructureMap, a DI container. It resolves the handler which means the handlers can declare dependencies in their constructors which will be injected by StructureMap.
The deserialize payload method looks something like this:
public IWorkItem DeserializePayload(CustomMessage task)
{
var t = Type.GetType(task.TypeName);
if (string.IsNullOrEmpty(task.Payload))
{
return Activator.CreateInstance(t) as IWorkItem;
}
return typeof(JsonConvert)
.GetMethods()
.Where(m => m.Name == "DeserializeObject" && m.IsGenericMethodDefinition)
.First(m => m.GetParameters().Count() == 1)
.MakeGenericMethod(t)
.Invoke(null, new object[] { task.Payload }) as IWorkItem;
}
The check to see if the payload is null or empty is simply because some jobs have data associated with them and others don't. Jobs with no data are basically just a general instruction - e.g. update the site map. Other work items will be specific to a domain entity and have data - these jobs are queued by the UI and might be something like:
The StructureMap config is easy too.public class SendPasswordResetRequest : IWorkItem { string UserId { get; set; } }
ObjectFactory.Initialize(x => { x.Scan(scanner => { scanner.AssemblyContainingType<XXX>(); scanner.AssemblyContainingType<XXX>(); scanner.TheCallingAssembly(); scanner.SingleImplementationsOfInterface(); }); });
Comments
Post a Comment