N.Cokljat N.Cokljat - 22 days ago 8
C# Question

How to reconnect to MSMQ in C# if the service is down for a period

Is it possible to reconnect to MSMQ, after the service was shut down, without restarting my application? If it is then how?

my code is:

void tmrDelay_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
{
tmrDelay.Stop();

while (isConnected)
{
try
{
if (ImportFromMSMQ.HasMessages())
{
MSMQ_ServiceLog.WriteEntry(string.Format("Transfering data from MSMQ to ActiveMQ started at {0}", DateTime.Now.ToString()));
while (ImportFromMSMQ.HasMessages())
{
ImportFromMSMQ.Run();
MSMQ_ServiceLog.WriteEntry(string.Format("Transfering data from MSMQ to ActiveMQ completed at {0}", DateTime.Now.ToString()));
}
}
else
{
MSMQ_ServiceLog.WriteEntry(string.Format("MSMQ is empty {0}", DateTime.Now.ToString()));
}
}
catch (Exception ex)
{
logger.Error(string.Format(" Error in data transfer MSMQ to ActiveMQ {0}", ex.Message));
MSMQ_ServiceLog.WriteEntry(string.Format(" Error in data transfer MSMQ to ActiveMQ {0}", ex.Message), EventLogEntryType.Error, -1);
}
}

tmrDelay.Start();
}


The while loop is something that I have added, but not yet tested, because of suspected infinite loop. If one shuts down the MSMQ service we end up in the
catch
clause and we need to restart our whole application when MSMQ is started again. But how can I retry to connect to the MSMQ and keep doing it while waiting for it to be started again?

public class ImportFromMSMQ
{
private static readonly ILog logger = LogManager.GetLogger(typeof(ImportFromMSMQ));
//Max records per records
private static int _maxMessagesPerTransaction = ConstantHelper.MaxMessagesPerTransaction;
private static bool _isNotCompleted = true;
private static int counter = 0;
private static MessageQueue mq;
private static long _maxMessageBodySizeLimitInBytes = ConstantHelper.MaxMessageBodySizeLimitInBytes;

// Start import
public static void Run()
{
Start();
}

public static bool HasMessages()
{
var _mqName = ConstantHelper.SourceQueue;
mq = new System.Messaging.MessageQueue(_mqName);
long _totalmsg = MessagesCounter.GetMessageCount(mq);

if (_totalmsg > 0)
{
logger.Info(string.Format(" {0} messages found in the {1} Queue ", _totalmsg.ToString(), _mqName));
}
else
{
logger.Info(string.Format(" There are no messages in the {1} Queue ", _totalmsg.ToString(), _mqName));
}

return _totalmsg > 0;
}


private static void Start()
{

logger.Info(string.Format("Data transfer starting at {0}", DateTime.Now.ToString()));
long _currentMessageBodySizeLimitInBytes = 0;
ArrayList messageList = new ArrayList();
System.Messaging.Message mes;
System.IO.Stream bodyStream = null;
// Create a transaction.
MessageQueueTransaction trans = new MessageQueueTransaction();
List<ConventionalData> objectList = new List<ConventionalData>();
IMessageContainer _container = new MessageContainer();
// Begin the transaction.
trans.Begin();
try
{
while (_isNotCompleted && counter < _maxMessagesPerTransaction)
{
try
{
counter++;
mes = mq.Receive(new TimeSpan(0, 0, 3), trans);
if (mes != null)
{
bodyStream = mes.BodyStream;
_currentMessageBodySizeLimitInBytes = _currentMessageBodySizeLimitInBytes + bodyStream.Length;
}

VisionAir.Messaging.ConventionalData data = ProtoBuf.Serializer.Deserialize<ConventionalData>(mes.BodyStream);
objectList.Add(data);

_isNotCompleted = _currentMessageBodySizeLimitInBytes <= _maxMessageBodySizeLimitInBytes;
}
catch
{
_isNotCompleted = false;
}
}

if (objectList.Count != 0)
{
logger.Info(string.Format("Starting transfer of {0} messages", objectList.Count));
_container.MQMessages = objectList;
ExportToActiveMQ _export = new ExportToActiveMQ(_container, (ExportOption) Enum.Parse(typeof(ExportOption),ConstantHelper.ExportFormat));
_export.Export();
}

logger.Info(string.Format("Transfer of {0} messages is completed at {1}",objectList.Count, DateTime.Now.ToString()));

// Commit transaction
trans.Commit();

counter = 0; //ResetF
_isNotCompleted = true; //Reset
}
catch (Exception ex)
{
logger.Error(ex);
// Roll back the transaction.
trans.Abort();
counter = 0;
}
}
}


I have looked at this question Service not receiving messages after Message Queuing service restarted but do not seem to fully understand it, so if anyone could elaborate on it, it would be nice.

Do I have to recursively call my
tmrDelay_Elapsed()
method I the
catch
clause or something else?

Answer

I found the solution, it was simple when I first realized that the first time we access the queue is in the HasMessages() method, so I changed it to the following:

public static bool HasMessages()
        {
            var _mqName = ConstantHelper.SourceQueue;
            long _totalmsg = 0;
            int countAttempts = 0;

        queueExists = false;
        while (!queueExists)
        {
            if (MessageQueue.Exists(_mqName))
            {
                queueExists = true;

                mq = new System.Messaging.MessageQueue(_mqName);
                _totalmsg = MessagesCounter.GetMessageCount(mq);

                if (_totalmsg > 0)
                {
                    logger.Info(string.Format(" {0} messages found in the {1} Queue ", _totalmsg.ToString(), _mqName));
                }
                else
                {
                    logger.Info(string.Format(" There are no messages in the {1} Queue ", _totalmsg.ToString(), _mqName));
                }
            } 
            else
            {
                logger.Info(string.Format("No Queue named {0} found, trying again.", _mqName));
                countAttempts++;
                if(countAttempts % 10 == 0)
                {
                    logger.Info(string.Format("Still no queue found, have you checked Services that Message Queuing(Micrsoft Message Queue aka. MSMQ) is up and running"));
                } 
                Thread.Sleep(5000);
            }
        }

        return _totalmsg > 0; 
    }

I check if the MessageQueue.Exists and keep trying it if it does not exist untill i find the queue. When I find it I set the queueExists to true, to break the while loop to move on and process the data received, if any data is received. I have read in some post that it is bad to use Thread.Sleep(n), but for now I am going with this solution.