In one of our applications we’ve been using mongodb’s change streams to listen to changes in the database and then broadcast them using signalr for a truly reactive experience.
We create an Observable (with Observable.Create) and listen to changes within the lambda passed to it. Every time a change is detected, we emit a new value in the stream by calling OnNext on the observable. Later on in the pipeline the values emitted are mapped to a data type tailored for the browser and transmitted asynchronously via signalr. Reactive extensions plays very nicely with these asynchronous Task-based operations.
There were several caveats to consider:
- this is intended to run for the entirety of the time the application is up, so should retry automatically when there’s an exception either with mongo or signalr
- updates can happen in quick succession, so they should not be broadcast in parallel to prevent them being received by clients out-of-order
To test this plumbing I used the Reactive.Testing library available on nuget. The github repository for reactive extensions on .NET isn’t the most user-friendly, but a helpful introduction can be found here.
The library includes a TestScheduler, which provides a way of stepping through time to check what values are emitted, and when. As can be seen from the introduction, this allows the tests to be written declaratively, with a list of values and their timestamps as input and an assertion method that takes a list of expected values and timestamps. This has the added benefit that everything’s synchronous (as long as you pass the scheduler through to any of the rx extension methods you’re using that have an overload that accepts a scheduler).
I did have some stumbling blocks to overcome. The first and simplest was that I needed my test class to inherit from ReactiveTest to get access to convenience methods like OnNext and OnCompleted.
The second was that I didn’t understand the times that the values were being emitted. I tried reviewing tests in the reactive extensions github repository, but the numbers just didn’t add up.
Here’s the code to set up the input stream for a test:
var xs = scheduler.CreateColdObservable( OnNext(100, 1), OnNext(150, 2), OnNext(200, 3), OnCompleted(250) );
And here’s the expected output:
res.Messages.AssertEqual( OnNext(300, 1), OnNext(350, 2), OnNext(400, 3), OnCompleted(450) );
It’s easy to tell from the increments of 50 ticks that it’s correct, but what’s the offset of 200 each time?
The answer lies in the overload to Start on the scheduler. The method takes 3 numbers: created, subscribed and disposed. The first is when the factory function that creates the observable is executed, the second is when the observable is subscribed to and the third when it’s disposed. When using the overload that doesn’t supply these values, 100, 200 and 1000 are used as defaults. This explains the 200 tick drift in the tests.
Whilst searching google for the answer I realised I wasn’t the only one confused by these magic numbers. I found an issue (from 2015!) raised by the RX legend Lee Campbell (of introtorx fame) expressing his concern about it.
After overcoming these hurdles it was a pleasure to use the testing library. Because even though rx does a good job of taming concurrency, it still allows for plenty of ways of subtly doing the wrong thing.