Sunday, March 13, 2016

ReSubmitting messages from ServiceBus Deadletter queue



ReSubmitting messages from ServiceBus Deadletter queue


Resubmitting messages, which do not require any change, from deadletter queue is straightforward and can be done using Service Bus Explorer tool.

However if the messages need change (say for example – because of bug – there is a special character which needs to be stripped off from the messages), ServiceBusExplorer would not do it. This can be done by using a custom tool/program to retrieve the messages from deadletter queue, store it somewhere, do the necessary modification and push the message back to the service bus. (Note – The earlier deadletter messages should be deleted (received)).
The sample code for reading messages from deadletter queue can be taken the open source ServiceBusExplorer tool itself. Program below –


class Program
    {
        private static int MaxBufferSize = 262144; // 256 KB

        static void Main(string[] args)
        {

            Uri uri = new Uri(@"sb://" + ConfigurationManager.AppSettings["SbHost"] + "." + ConfigurationManager.AppSettings["Domain"] + "/" + ConfigurationManager.AppSettings["Namespace"]);
            TokenProvider tokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(ConfigurationManager.AppSettings["KeyName"], ConfigurationManager.AppSettings["SharedAccessKey"]);
            MessagingFactory factory = MessagingFactory.Create(uri, tokenProvider);

            SubscriptionClient subsClient = factory.CreateSubscriptionClient(ConfigurationManager.AppSettings["Topic"], ConfigurationManager.AppSettings["Subscription"] + "/$DeadLetterQueue");

            var listMessages = subsClient.PeekBatch(Convert.ToInt32(ConfigurationManager.AppSettings["PeekMessageCount"]));           

            foreach (var bm in listMessages)
            {
                string messageText = null;
                Stream stream = bm.GetBody<Stream>();
                var element = new BinaryMessageEncodingBindingElement
                {
                    ReaderQuotas = new XmlDictionaryReaderQuotas
                    {
                        MaxArrayLength = int.MaxValue,
                        MaxBytesPerRead = int.MaxValue,
                        MaxDepth = int.MaxValue,
                        MaxNameTableCharCount = int.MaxValue,
                        MaxStringContentLength = int.MaxValue
                    }
                };
                var encoderFactory = element.CreateMessageEncoderFactory();
                var encoder = encoderFactory.Encoder;
                var stringBuilder = new StringBuilder();
                var message = encoder.ReadMessage(stream, MaxBufferSize);
                using (var reader = message.GetReaderAtBodyContents())
                {
                    var settings = new XmlWriterSettings { Indent = true };
                    using (var writer = XmlWriter.Create(stringBuilder, settings))
                    {
                        writer.WriteNode(reader, true);
                    }
                }
                messageText = stringBuilder.ToString();
                Console.WriteLine(messageText);
                stream.Dispose();
                File.WriteAllText(ConfigurationManager.AppSettings["PathToStoreMessages"] + bm.MessageId + ".txt", messageText);
            }



        }


    }

No comments:

Post a Comment