The Windows Azure Worker Role is a perfect place to put code that you want to run continuously in the background to process work as it becomes available. The information presented here would also be useful in web roles as well.
If you’re writing cloud applications, its likely you are targeting high levels of performance and scalability. It is reasonable to expect that you want to get the most out of your investment in cloud computing, and making the best use of your purchased resources will save you money. It is therefore also reasonable to expect that most non-trivial applications that you deploy to a production cloud environment would be written to perform I/O operations asynchronously.
In a Windows Azure Worker Role, a single thread is dispatched to your worker process’ Run method by the Windows Azure AppFabric. The rest of the threading model is left up to you. This is very much like a windows service or a console application. If you want to make maximum use of the cores available in your service instances then it is highly recommended that you leverage the CLR thread pool.
Using the .NET Task Parallel Library (TPL) is an option if your worker roles are compute-bound, but it won’t help you much for I/O bound operations. Because today’s CPU’s are so powerful and the data being operated on by your worker role will generally have to be retrieved from some remote location such as Windows Azure Data Storage or SQL Azure, it is much more likely that your worker roles will be I/O-bound than compute-bound. If you use the TPL to increase concurrency it will do so by increasing the number of threads. Threads are resource heavy and there is a limit to the number that you can create before performance is degraded instead of being improved.
The entry point for a Windows Azure Worker Role is the Run() method. Although we would really love it if the architecture of the Worker Role allowed us to not block this thread until we want to terminate the worker role, the Windows Azure Worker Roles do not allow this… so we reluctantly put ourselves into an infinite “while(true)” loop with a 3 second sleep interval, and when there is work to perform in the job queues this thread dispatches the messages to the thread pool during its next wake cycle. The thread pool typically creates one thread per CPU, and these threads will process the messages concurrently and efficiently (without context switching).
As depicted in the following code example, we get the maximum number of messages that we can from the Windows Azure Data Storage queue, and then we create an AsyncEnumerator instance to asynchronously process each message. We call the BeginExecute method of the AsyncEnumerator passing in the message processing routine “ProcessMsg”
public override void Run() {
while (true) {
Boolean anyMsgs = false;
// I call GetMessages synchronously because the Run thread can’t do anything else
foreach (var msg in s_msgQueue.GetMessages(CloudQueueMessage.MaxNumberOfMessagesToPeek, TimeSpan.FromSeconds(30))) {
anyMsgs = true;
var ae = new AsyncEnumerator();
ae.BeginExecute(ProcessMsg(ae, msg), ae.EndExecute, null);
}
// there may still be messages in the queue so don’t sleep; try to get them
if (anyMsgs == false) {
// I call Thread.Sleep synchronously because the Run thread can’t do anything else
Thread.Sleep(3000);
}
}
}
The ProcessMsg routine (as shown below) handles the grunge work of processing each of the incoming messages in an asynchronous fashion. In this example, each message represents an image to be watermarked, but this code is meant to be generic and representative of any operation that included some I/O aspects to it. Notice that all I/O operations in the ProcessMsg routine utilize the Begin and End methods as described in the Asynchronous Programming Model (APM).
private IEnumerator<Int32> ProcessMsg(AsyncEnumerator ae, CloudQueueMessage cloudMsg) {
QueueMessage msg = QueueMessage.Parse(cloudMsg.AsString);
var ctx = s_tables.GetDataServiceContext();
var query = (DataServiceQuery<MyEntity>)(from e in ctx.CreateQuery<MyEntity>(c_containerName)
where (msg.PartitionKey == e.PartitionKey) && (msg.RowKey == e.RowKey)
select e);
query.BeginExecute(ae.End(), null);
yield return 1;
var p = query.EndExecute(ae.DequeueAsyncResult()).FirstOrDefault();
if (p == null) yield break; // Entity deleted, skip this one
// Grab image from blob (could throw), thumbnail it, create new thumbnail blob
var container = s_blobs.GetContainerReference(c_containerName);
var blob = container.GetBlockBlobReference(p.PhotoBlobID);
// Get our blob’s attributes
blob.BeginFetchAttributes(ae.End(), null);
yield return 1;
blob.EndFetchAttributes(ae.DequeueAsyncResult());
// How do we get the length of a blob
MemoryStream ms = new MemoryStream(checked((Int32)blob.Properties.Length));
blob.BeginDownloadToStream(ms, ae.End(), null);
yield return 1;
blob.EndDownloadToStream(ae.DequeueAsyncResult());
String newBlobID = Guid.NewGuid().ToString();
CloudBlockBlob newblob = blob.Container.GetBlockBlobReference(newBlobID);
newblob.Properties.ContentType = blob.Properties.ContentType;
newblob.BeginUploadFromStream(CreateWatermarked(ms), ae.End(), null);
yield return 1;
blob.EndUploadFromStream(ae.DequeueAsyncResult());
}
For more information on the AsyncEnumerator, you will want to read Jeffrey Richter’s June 2008 Concurrent Affairs article (see http://msdn.microsoft.com/en-us/magazine/cc546608.aspx). The AsyncEnumerator is part of the Wintellect Power Threading Library which you may download from here: https://training.atmosera.com/Resources/visit-the-power-threading-library