Milan Milan - 23 days ago 8
C# Question

Azure Service Bus - Receive Messages with OnMessage() Method

Following the MS documentation it was not difficult to receive message(s) from subscription. However, if I'd like my application to receive a message every time new message is posted - a constant polling. Hence the OnMessage() method of the SubscriptionClient class.

MS documentation says: "...When calling OnMessage, the client starts an internal message pump that constantly polls the queue or subscription. This message pump consists of an infinite loop that issues a Receive() call. If the call times out, it issues the next Receive() call. ..."

But when the application is running, the moment OnMessage() method is called only latest message(s) is received. When new messages are posted the constant polling does not seem to be working. After trying many different approaches the only way I could make this work and have the application react the moment new message is received is to place the code into a separate task with infinite loop. This seems totally wrong on so many levels! (see code below).

Can anyone help me to correct my code or post a working sample to accomplish the same functionality without the loop? Thank you!

public void ReceiveMessageFromSubscription(string topicName, string subscriptionFilter)
{
var newMessage = new MessageQueue();
int i = 0;

Task listener = Task.Factory.StartNew(() =>
{
while (true)
{
SubscriptionClient Client = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionFilter);

Dictionary<string, string> retrievedMessage = new Dictionary<string, string>();

OnMessageOptions options = new OnMessageOptions();
options.AutoComplete = false;
options.AutoRenewTimeout = TimeSpan.FromMinutes(1);

Client.OnMessage((message) =>
{
try
{
retrievedMessage.Add("messageGuid", message.Properties["MessageGuid"].ToString());
retrievedMessage.Add("instanceId", message.Properties["InstanceId"].ToString());
retrievedMessage.Add("pId", message.Properties["ProcessId"].ToString());
retrievedMessage.Add("processKey", message.Properties["ProcessKey"].ToString());
retrievedMessage.Add("message", message.Properties["Message"].ToString());

newMessage.AnnounceNewMessage(retrievedMessage); // event ->

message.Complete(); // Remove message from subscription.
}
catch (Exception ex)
{
string exmes = ex.Message;
message.Abandon();
}

}, options);

retrievedMessage.Clear();

i++;

Thread.Sleep(3000);
}

});
}

Answer

Your code has a few issues to iron out -

  • It falls through and I assume your application then exits - or at least the thread that is listening for the messages terminates.
  • Your while loop keeps repeating the code to hook up the message handler, you only need to do this once.
  • You need a way to keep the call stack alive and prevent your app from garbage collecting your object.

The below should see you on your way to success. Good luck.

 ManualResetEvent CompletedResetEvent = new ManualResetEvent(false);
    SubscriptionClient Client;

    public void ReceiveMessagesFromSubscription(string topicName, string subscriptionFilter, string connectionString)
    {
        Task listener = Task.Factory.StartNew(() =>
        {
            // You only need to set up the below once. 
            Client = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionFilter);

            OnMessageOptions options = new OnMessageOptions();
            options.AutoComplete = false;
            options.AutoRenewTimeout = TimeSpan.FromMinutes(1);
            options.ExceptionReceived += LogErrors;

            Client.OnMessage((message) =>
            {
                try
                {
                    Trace.WriteLine("Got the message with ID {0}", message.MessageId);
                    message.Complete(); // Remove message from subscription.
                }
                catch (Exception ex)
                {
                    Trace.WriteLine("Exception occurred receiving a message: {0}" + ex.ToString());
                    message.Abandon(); // Failed. Leave the message for retry or max deliveries is exceeded and it dead letters.
                }

            }, options);

            CompletedResetEvent.WaitOne();
        });
    }

    /// <summary>
    /// Added in rudimentary exception handling .
    /// </summary>
    /// <param name="sender">The sender.</param>
    /// <param name="ex">The <see cref="ExceptionReceivedEventArgs"/> instance containing the event data.</param>
    private void LogErrors(object sender, ExceptionReceivedEventArgs ex)
    {
        Trace.WriteLine("Exception occurred in OnMessage: {0}" + ex.ToString());
    }

    /// <summary>
    /// Call this to stop the messages arriving from subscription.
    /// </summary>
    public void StopMessagesFromSubscription()
    {
        Client.Close(); // Close the message pump down gracefully
        CompletedResetEvent.Set(); // Let the execution of the listener complete and terminate gracefully 
    }

Alternatively you can chunk the message off in a more traditional fashion yourself using ReceiveBatch:

var messages = await queueClient.ReceiveBatchAsync(10, TimeSpan.FromSeconds(30),
                                                       cancellationToken);