End to End testing using Reactive Extensions (RX)

A change of tack is needed with my posts.  The limited people who have read my blog have been too kind to say that I talk a bit too much in my posts.  So I've read back over my posts and I am musing like some kind of pseudo-intellectual who is smiling smugly and patting  himself on the back over some kind of  new insight into stuff which is really just a regurgitation.

So I should stop it and keep the musings to a minimum from here on in. I've rewritten this post to try and give something valuable that can be taken away and used.

Requisite Knowledge


General C#, Generics, Messaging Patterns, NServiceBus, Linq, Understanding of asynchrony, Task<T>, Unit testing.


Reactive Extensions 

I decided to use Rx to help end to end test a payment method service.  I wont go into a lot of detail about Rx, and instead refer you to an absolutley fantastic reference, which can be found at reactivex.io. The visualisations -  especially -  help bring to life what Rx does and can help stimulate ideas about how it can be used for you.    Rx.NET is what the code examples (in C#), here, are programmed against. 

The problem

I need to be able to test that my payment method service (service) is working by ensuring that, given various input,  it is producing the correct output (messages) in a timely fashion. 

I need a way to be able to ask the following questions  "Given xyz input, has a payment accepted event occured?", and if so "Does it relate to this payment reference and has it arrived within10 seconds?".  If so, great my service is working, if not, then it isn't.  

The service to be tested comprises a front end (RESTful like API) and a backend server (producing messages).  From a testing perspective, appropriate input should be supplied via the API and then various messages from the server should occur as a result.  This is going to need end to end testing.   (Edit 1:  So I am saying we need end to end testing IF we want to exercise a fully working service with all its real world external dependencies AND this is necessary as the API and server reside in different processes.  We could  still component test the API and backend server - each individually  - by swapping out external dependencies  but this wouldn't provide us an integrated test .  There are advocates and detractors  of  end to end testing along with its tradeoffs. 
(Edit 2:  I am choosing to state end to end testing over  "external" component testing whereby external colloborators (I/O, Http) are included in the mix - because the Microservice which logically comprises an API, and other backend messages servers are involved - bit of a minefield!)

My experience is that, if not insanely difficult or expensive  (in time, complexity, and maintenance) it is worth it for the confidence (edit: and coverage) an end to end test can bring.


The services in our platform use NServiceBus (NSB) to send and receive what are usually domain events (edit: synonomous with messages), we can leverage this to get a handle on these messages and inspect them or lack of them.

To be able to ask the questions above I need to be able to observe (listen) on a channel for the messages  (an observable sequence) and then run a test asking relevant questions. 

In this instance the channel will be an event handler sitting atop an associated "bus" (set up not shown here)  - an abstraction NSB provides to represent the medium on which messages are passed. 


The handler:

public class GenericMessageHandler : IHandleMessages<object>
    {
        public void Handle(object message)
        {
            Observer.OnNotify(message);
        }
    }


The handlers job is to listen to messages which arrive at the handler and notify the Observer. The Observer is a facade for what is known in Rx.NET as a ReplaySubject, which is a special type of Observer (unfortunately my facade reuses/overloads the term Observer), the Observer has or will be setup with a bunch of subscribers which  will be the tests (or a way to set the results of the tests**) to run for a certain type of message.

The ReplaySubject allows buffering so that if a "would be" subscriber (which we will be using to hook up tests)  is not yet subscribed, and messages have started occuring on the channel (that is: arriving at the handler and being pushed into the ReplaySubject buffer)  then when the subscriber is subscribed all messages which have happened will be replayed to the subscriber.  

This special property (subject to limited buffering) means its not  important that a subscriber is hooked up straight away, as long as the Observer (the ReplaySubject) is getting messages and, so, in this way the chances of missing something on the channel are reduced. 

It is worth nothing that  NSB uses publish/subscribe, so some setup has to be provided to ensure that messages of certain types are subscribed to and therefore will arrive at the handler. This code is omitted here. ( available in git shortly).

The Observer facade has 2 users:
  • the channel
  • unit tests
The channel pushes in messages to the Observer
The unit tests will be set up to "ask the questions" and provide the tests against those messages and these tests will be "attached/subscribed" to the Observer.




The Observer looks like this:


public class Observer
    {
        private static readonly ReplaySubject<object> MessageObservable = new ReplaySubject<object>();

        public static void OnNotify(object message)
        {
            MessageObservable.OnNext(message);
        }

        public static Task<T> SubscribeToMessageTest<T>(Func<T, bool> test, string failureMessage, TimeSpan timeout) where T : class
        {
            var tcs = new TaskCompletionSource<T>();

            MessageObservable
               .Timeout(timeout)
               .FirstAsync(message => RunTest(message, test))
               .Subscribe(
                   message => OnSuccess(tcs, message),
                   exception => OnFailure(exception, tcs, failureMessage));

            return tcs.Task;
        }

        private static bool RunTest<T>(object message, Func<T, bool> messageTest) where T : class
        {
            var convertedMessage = ConvertMessage<T>(message);
            if (convertedMessage != null)
            {
                return messageTest(convertedMessage);
            }
           
            return false;
        }

        private static T ConvertMessage<T>(object message) where T : class
        {
            var messageAsType = message as T;
            return messageAsType;
        }

        private static void OnSuccess<T>(TaskCompletionSource<T> tcs, object message) where T : class
        {
            tcs.SetResult(ConvertMessage<T>(message));
        }

        private static void OnFailure<T>(Exception ex, TaskCompletionSource<T> tcs, string failureMessage)
        {
            Console.WriteLine(failureMessage);
            var messsageException = new PublishTestException(failureMessage, ex);
            tcs.SetException(messsageException);
        }
    }



The most interesting method is:
Task<T> SubscribeToMessageTest<T>(Func<T,bool> test, string failureMessage, TimeSpan timeout)


At a highlevel then: a test is passed in which has previously been setup to be  executed against a specific type of message (when a message occurs on the channel and appears in what is known as an observable sequence) 
A TaskCompletionSource is created (tcs) whose  result is going to be set with a message if the associated test is successful or, an exception if it takes longer than a certain time.  A test is always unsuccessful if it can't be satisfied within the specified timeout period. 
The task from the TaskCompletionSource is then returned. 


The parameter "test" is a Func<T, bool>  (a generic predicate) to be hooked up to test elements in an observable sequence (the messags from the channel), to  MessageObservable (which is a ReplaySubject<object>. 

This is done with the lines:
MessageObservable
.Timeout(timeout)
.FirstAsync(message => RunTest(message, test))

Firstly,  via Timeout,  a timeout is set (if this is exceeded an exception is thrown)
Secondly, via FirstAsync, return ONLY the elements from the observable sequence  (the messages from the channel) which pass the test passed to RunTest.  The RunTest method does some conversion on the message, and runs the test. 


The next 3 lines:
.Subscribe(
  message => OnSuccess(tcs, message),
  exception => OnFailure(exception, tcs, failureMessage));


Subscribes the message  to a couple of methods which set the state of  the TaskCompletionSource (tcs) declared at the top of the method. 
OnSuccess takes the message itself and puts it into the result of the tcs.  
OnFailure occurs when a timeout occurs and the exception is put on to the exception on tcs. 
Not forgetting that the task associated with tcs was returned earlier and so consumers of the code would be notified as and when this occurs.


The SubscribeToMessageTest method is facaded with a PublishTest<T>:

    public class PublishTest<T> where T : class
    {
        private Task<T> task;

        /// <summary>
        /// Test a message that satisfies a predicate, before a timeout occurs.
        /// </summary>
        /// <typeparam name="T">The type of the message to test.</typeparam>
        /// <param name="predicate">A predicate with tests you need to determine the correct message has been published.</param>
        /// <param name="failureMessage">A message to aid debugging the test failure.</param>
        /// <param name="timeout">Timespan indicating how long to wait for a message to be received.</param>
        /// <returns>Task which should be awaited or Result inspected.</returns>
        public PublishTest(Func<T, bool> predicate, string failureMessage, TimeSpan timeout)
        {
            this.task = Observer.SubscribeToMessageTest(predicate, failureMessage, timeout);
        }

        /// <summary>
        /// Block whilst waiting for test to verify. Use where async/await is not supported.
        /// For example, SpecFlow does not currently support async/await.
        /// </summary>
        /// <returns>Message associated with successful test.</returns>
        public T Verify()
        {
            return this.task.Result;
        }

        /// <summary>
        /// Verify the test at a later point in time by calling .Result or using async/await on the Task returned.
        /// </summary>
        /// <returns>Task with message assocated with successful test.</returns>
        public Task<T> VerifyAsync()
        {
            return this.task;
        }
    }




A test can be written Which is then used like so in a test:

    [Then(@"a PayPalPaymentIntentAcceptedInternalV4 is published")]
        public void ThenAPayPalPaymentAcceptedIsPublished()
        {
            new PublishTest<PayPalPaymentIntentAcceptedInternalV4>(
                message => message.PaymentReference == this.context.PaymentReference,
                "No PayPalPaymentIntentAcceptedInternalV4 message was published matching the specified test.",
                this.context.MessageVerifyWaitTime).Verify();
        }



The lambda - which provides the actual test -  (message => message.PaymentReference...) and the type "PayPalPaymentIntentAcceptedV4" are used in tandem to qualify that firstly a test for this message type should take place and that the message is associated with input supplied earlier at the API, which would have the same payment reference.  The lambda/test is  ultimately what is passed to RunTest in the Observer class (which was listed earlier). 

**As I am writing this I can see some improvements I would like to make to this code and will add an edit (also adding proper code snippets).  I have left out some of the set up code around  NServiceBus  and the handler and setup to subscribe to messages.  Also the other parts of the tests are omitted for brevity.  ( To be included in git). 
 


















Comments

Popular posts from this blog

Book Review: Test Driven Development By Example (Kent Beck)

SSDT March 2014 Static Code Analysis Extensibility

MongoDB