Showing posts with label ServiceBus. Show all posts
Showing posts with label ServiceBus. Show all posts

Saturday, April 1, 2017

Advanced concepts WCF listener for Service Bus


For basics of creating a WCF listener to a ServiceBus queue/topic, Pl refer -
WCF-ServiceBus Listener




1)      The default protocol used is NetMessaging (not AMQP). NetMessaging is a native protocol available in .NET only. AMQP is an OASIS standard and interoperable, meaning message sent by .NET publisher could by consumed by PHP/Python client.
2)      Unlike Service Bus, ActiveMQ offers a lot of protocols for message interchange like –
a)      Openwire (native protocol for activemq – default)
b)      Mqtt (light weight pub-sub protocol typically used for IoT)
c)      Amqp (cross platform, efficient)
d)  STOMP
3)      With the above approach, a new instance of WCF service is created each time, a message arrives the topic/queue. This was confirmed with the following code –
public interface IService
{       
        [OperationContract(IsOneWay = true, Action = "*"), ReceiveContextEnabled(ManualControl = true)]
        void MessageReader(Message message);
}
public class MyService: IService
{
        private string instanceId = "";
        public MyService()
        {
            instanceId = Guid.NewGuid().ToString();           
        }
        public void MessageReader(Message message)
        {
            var logEntry = new LogEntry { Message = string.Format("Service instance id is {0}",instanceId) };
            Utilities.WriteLogSimple(logEntry);
        }
}

4)      Once message arrives a topic/queue, a lock is applied. All topics/queues have a message lock timeout property, whose default value is 1 min and can be set to 5 mins as maximum value. The processing of the message has to be committed (or rolled back)  within this time or else the lock will be automatically released and the message will go back to the topic/queue, thus increasing the delivery count. Pl note that this redelivery will trigger the wcf processing again .The max delivery count can also be set. Once the message has been redelivered more than the “max delivery count”, it goes to “deadletter queue”.



  5)   With reference to the point above, the following situation can occur and should be handled carefully. Suppose a message arrives a topic, a wcf instance is created and it applies the lock on the message. The processing is message takes more than 1 minute (default lock timeout) and get redelivered to the topic/queue 10 times (default max delivery count). Ultimately all the 10 times, the message gets processed, thus causing inconsistent results. So make sure the message processing timeout (say database execution timeout or service call) is always less than message lock timeout. Also reduce the max delivery count from 10 (default) to a suitable value.




Sunday, March 13, 2016

ReSubmitting messages from ServiceBus Deadletter queue



ReSubmitting messages from ServiceBus Deadletter queue


Resubmitting messages, which do not require any change, from deadletter queue is straightforward and can be done using Service Bus Explorer tool.

However if the messages need change (say for example – because of bug – there is a special character which needs to be stripped off from the messages), ServiceBusExplorer would not do it. This can be done by using a custom tool/program to retrieve the messages from deadletter queue, store it somewhere, do the necessary modification and push the message back to the service bus. (Note – The earlier deadletter messages should be deleted (received)).
The sample code for reading messages from deadletter queue can be taken the open source ServiceBusExplorer tool itself. Program below –


class Program
    {
        private static int MaxBufferSize = 262144; // 256 KB

        static void Main(string[] args)
        {

            Uri uri = new Uri(@"sb://" + ConfigurationManager.AppSettings["SbHost"] + "." + ConfigurationManager.AppSettings["Domain"] + "/" + ConfigurationManager.AppSettings["Namespace"]);
            TokenProvider tokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(ConfigurationManager.AppSettings["KeyName"], ConfigurationManager.AppSettings["SharedAccessKey"]);
            MessagingFactory factory = MessagingFactory.Create(uri, tokenProvider);

            SubscriptionClient subsClient = factory.CreateSubscriptionClient(ConfigurationManager.AppSettings["Topic"], ConfigurationManager.AppSettings["Subscription"] + "/$DeadLetterQueue");

            var listMessages = subsClient.PeekBatch(Convert.ToInt32(ConfigurationManager.AppSettings["PeekMessageCount"]));           

            foreach (var bm in listMessages)
            {
                string messageText = null;
                Stream stream = bm.GetBody<Stream>();
                var element = new BinaryMessageEncodingBindingElement
                {
                    ReaderQuotas = new XmlDictionaryReaderQuotas
                    {
                        MaxArrayLength = int.MaxValue,
                        MaxBytesPerRead = int.MaxValue,
                        MaxDepth = int.MaxValue,
                        MaxNameTableCharCount = int.MaxValue,
                        MaxStringContentLength = int.MaxValue
                    }
                };
                var encoderFactory = element.CreateMessageEncoderFactory();
                var encoder = encoderFactory.Encoder;
                var stringBuilder = new StringBuilder();
                var message = encoder.ReadMessage(stream, MaxBufferSize);
                using (var reader = message.GetReaderAtBodyContents())
                {
                    var settings = new XmlWriterSettings { Indent = true };
                    using (var writer = XmlWriter.Create(stringBuilder, settings))
                    {
                        writer.WriteNode(reader, true);
                    }
                }
                messageText = stringBuilder.ToString();
                Console.WriteLine(messageText);
                stream.Dispose();
                File.WriteAllText(ConfigurationManager.AppSettings["PathToStoreMessages"] + bm.MessageId + ".txt", messageText);
            }



        }


    }

Sunday, August 9, 2015

Implementing long running processes using Service Bus


Following design pattern can be used to implement a long running process with multiple independent transactions.
Scenario – On the click of “Submit”, Call to the payment gateway (via PaymentService) needs to be done, Address is to be updated (AddressService) and MobileNumber is to be updated (UserProfileService).
Assumption – 1) Call to AddressService.UpdateAddress and UserProfileService.UpdateMobileNumber is idempotent.
2) Retry logic is not implemented.

Caution – Duplicate payment scenario needs to be taken care of.

Step1) On the click the “Submit”, Capture all the details like Customer Id, new address, new mobile number, accountnumber, Bank IFSC code, amount, and unique RequestId  etc, create a XML message and send it to ServiceBus topic/queue via ABCService. On receiving the success message from ABCService, Show message to the end user, that “Your request is successfully recorded and the changes will be reflected shortly.”

Step2) There will be another service ABCTransactionService, which is auto-started and listening to the ServiceBus topic/queue for any incoming message.  As soon as the message in Point 1 arrives, it is processed by calling the PaymentService, UserProfileService, AddressService and recording their status in a TrackingTable.

Details below -

The structure of the TrackingTable will be –

·         RequestId (Primary Key)
·         CustomerId
·         IsCallingPaymentService (Flag set before calling payment gateway)
·         IsPaymentSuccessful
·         IsAddressChangeSuccessful
·         IsMobileUpdateSuccessful
·         IsRequestComplete

Processing Detail –
a)      Read message from ServiceBus topic/queue.
b)      Check if IsCallingPaymentService is false or empty. (empty/false for a fresh case)
c)       If IsCallingPaymentService  is false or empty, Update IsCallingPaymentService to true and Call  PaymentService.
d)      Else Abandon the message to send it to the deadletter queue and process manually. This approach is needed to prevent duplicate call to PaymentService (deduct from Customer account twice).
e)      Capture the response of PaymentService and update the IsPaymentSuccessful to true/false depending on the response code. In case of exceptions (timeout,communication failure), Update IsPaymentSuccessful to false. If the IsPaymentSuccessful is false, abandon the message (manual check required) else proceed.

UpdateAddress and UpdateMobileNumber is not critical and are idempotent. Hence we are not capturing the flag before the call is happened. The “before operation” flag is needed in the following case – Call to PaymentService is successful but Call to TrackingTable for updating flag failed (because of DBConnection!!). In this case, we might think that Call to PaymentService never happened (as per the tracking table,  IsPaymentSuccessful is empty), and call it again during manual processing (unless someone checks the logs in detail).

f)       Call UpdateAddress and capture the response. Update the response in TrackingTable – IsAddressChangeSuccessful. In case of any failure/exception abandon the message to move it to the deadletter queue.
g)      Do the same for UpdateMobileNumber operation.
h)      Update the IsRequestComplete to true and de-queue the message. (receivecontext.complete)

FAILURE SCENARIOS –
1)      Go to Deadletter queue to see if there are issues.
2)      Check the RequestId and find the values against that requestid in the TrackingTable.
3)      Check the values of IsPaymentSuccessful=true and IsCallingPaymentService=true to see if the payment was successful. Check logs if payment failed.
4)      To reprocess the message, Open ServiceBusExplorer and go the Deadletter queue and resubmit the message to the required queue/topic.

5)      We will be safe to reprocess the message as Call to PaymentService is will not go (if already called) – Manual check of logs required in case of Payment failure. Other operation can be  called anyway because of idempotency.

Tuesday, July 7, 2015

Service Bus authentication and permissions

1)      Clients can only connect to the Service Bus, if they have the SB certificates in their Certificate Store. The ServiceBus certificates, if autogenerated, can be found using the cmdlet: Get-SBAutoGeneratedCA.
2)      Second we either need to use Shared Access Key or Windows STS Uri. If ServiceBus is installed in a domain and the client (Windows 7) is not connected to the domain (example – Corpnet), then its better to use Shared Access Key, which can be retrieved by the cmdlet: Get-SBAuthorizationRule. There are mainly 3 rights –Listen, Send, Manage.

3)      Third – To create Topics, Queues and other admin related task, the user must be a part of ManageUsers collection of the ServiceBus Namespace, which can be retrieved by the cmdlet: Get-SBNamespace.

Create a WCF listener for Service Bus

0)      Add nuget package reference to ServiceBus.v1_1 (at the time of writing this).

1)      Your ServiceContract should look this this –
[ServiceContract]
    public interface IService1
    {

        [OperationContract(IsOneWay = true, Action = "*"), ReceiveContextEnabled(ManualControl = true)]
        void AccountingReader(Message message);

        // TODO: Add your service operations here
    }
ManualControl is true means that when the message is received from the ServiceBus, We have to manually invoke receiveContext.Complete(); to remove it from the queue or topic.

2)      Your Web config should look this this –
a)      One key point - When working in Windows 7 (non server machine, or When not in corpnet), use SAS to authentication as windows STS does not seem to work.
b)      Second , in case of any issue, use – WCF tracing.

<system.serviceModel>  
    <extensions>
      <!-- In this extension section we are introducing all known service bus extensions. User can remove the ones they don't need. -->
      <behaviorExtensions>
        <add name="connectionStatusBehavior" type="Microsoft.ServiceBus.Configuration.ConnectionStatusElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="transportClientEndpointBehavior" type="Microsoft.ServiceBus.Configuration.TransportClientEndpointBehaviorElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="serviceRegistrySettings" type="Microsoft.ServiceBus.Configuration.ServiceRegistrySettingsElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
      </behaviorExtensions>
      <bindingElementExtensions>
        <add name="netMessagingTransport" type="Microsoft.ServiceBus.Messaging.Configuration.NetMessagingTransportExtensionElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="tcpRelayTransport" type="Microsoft.ServiceBus.Configuration.TcpRelayTransportElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="httpRelayTransport" type="Microsoft.ServiceBus.Configuration.HttpRelayTransportElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="httpsRelayTransport" type="Microsoft.ServiceBus.Configuration.HttpsRelayTransportElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="onewayRelayTransport" type="Microsoft.ServiceBus.Configuration.RelayedOnewayTransportElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
      </bindingElementExtensions>
      <bindingExtensions>
        <add name="basicHttpRelayBinding" type="Microsoft.ServiceBus.Configuration.BasicHttpRelayBindingCollectionElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="webHttpRelayBinding" type="Microsoft.ServiceBus.Configuration.WebHttpRelayBindingCollectionElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="ws2007HttpRelayBinding" type="Microsoft.ServiceBus.Configuration.WS2007HttpRelayBindingCollectionElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="netTcpRelayBinding" type="Microsoft.ServiceBus.Configuration.NetTcpRelayBindingCollectionElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="netOnewayRelayBinding" type="Microsoft.ServiceBus.Configuration.NetOnewayRelayBindingCollectionElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="netEventRelayBinding" type="Microsoft.ServiceBus.Configuration.NetEventRelayBindingCollectionElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="netMessagingBinding" type="Microsoft.ServiceBus.Messaging.Configuration.NetMessagingBindingCollectionElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
      </bindingExtensions>
    </extensions>
    <bindings>     
      <customBinding>
        <binding name="messagingBinding" closeTimeout="00:03:00" openTimeout="00:03:00" receiveTimeout="00:03:00" sendTimeout="00:03:00">
          <textMessageEncoding messageVersion="None">
            <readerQuotas maxStringContentLength="2147483647" />
          </textMessageEncoding>
          <!--<binaryMessageEncoding/>-->
          <netMessagingTransport />
        </binding>
      </customBinding>
    </bindings>
    <behaviors>
      <endpointBehaviors>
        <behavior name="securityBehavior">
          <transportClientEndpointBehavior>
            <tokenProvider>
              <!--VV Imp. When working in Windows 7 (non server machine), use SAS to authentication as windows STS does not seem to work-->
              <sharedAccessSignature keyName="RootManageSharedAccessKey" key="gqlUNxI0+lNzJopR0gaJOt8LLOn3jELsRepjBSij7T4=" />
              <!--<windowsAuthentication>
                <stsUris>
                  <stsUri value="https://VM-WEB-AZURE/ServiceBusDefaultNamespace"/>               
                </stsUris>
              </windowsAuthentication>-->
            </tokenProvider>
          </transportClientEndpointBehavior>
        </behavior>
      </endpointBehaviors>
      <serviceBehaviors>
        <behavior>
          <!--To avoid disclosing metadata information, set the values below to false before deployment-->
          <serviceMetadata httpGetEnabled="true" httpsGetEnabled="true" />
          <!--To receive exception details in faults for debugging purposes, set the value below to true.  Set to false before deployment to avoid disclosing exception information-->
          <serviceDebug includeExceptionDetailInFaults="true" />
        </behavior>
      </serviceBehaviors>
    </behaviors>
    <protocolMapping>
      <add binding="basicHttpsBinding" scheme="https" />
      <add binding="netMessagingBinding" scheme="sb" />
    </protocolMapping>
    <serviceHostingEnvironment aspNetCompatibilityEnabled="true" minFreeMemoryPercentageToActivateService="0" multipleSiteBindingsEnabled="true" />
    <services>
      <service name="ReceiveFromWSSB.Service1">

        <endpoint name="myEndPoint" listenUri="sb://VM-WEB-AZURE/ServiceBusDefaultNamespace/as400/subscriptions/AllAS400"
                  address="sb://VM-WEB-AZURE/ServiceBusDefaultNamespace/as400" binding="customBinding"
                  bindingConfiguration="messagingBinding" contract="ReceiveFromWSSB.IService1" behaviorConfiguration="securityBehavior"/>

     
      </service>
    </services>


  </system.serviceModel>
3)      Your Service implementation should look like this  -

[ServiceBehavior(AddressFilterMode = AddressFilterMode.Any)]
    public class Service1 : IService1
    {
         public void AccountingReader(Message message)
        {


4)  Push some messages to the Service Bus Topic or Queue through code or through SB Explorer as shown below –