Ruurd
Ruurd

Jul 29, 2013 2 min read

Reactive Extension and ObserveOn

I’ve been actively following and using the work of the Reactive Extensions (Rx) team from early on, as Rx is truly a unique library for working with events. However, some days ago I discovered something didn’t quite work as expected, and it involves the ObserveOn and SubscribeOn methods of Observable.

The problem case

We had an eventstream – in particular XML messages arriving on a TCP port – which arrived at a relatively high rate. We did all of the event detection, filtering and handling with a Rx chain, which worked great. In the end, the event data had to be persisted using a database. This last step is where we meet the real problem, as the database operation could take longer than the time between arriving events causing queuing of the incoming events.

The solution (or so we naively thought)

Let’s put every database operation on a seperate thread so we offload all IO delays and free our main thread for the real computations. How ? There is this nice little method on Observable called ObserveOn which allows you to specify where you want the observing to take place:

public static class Observable
{
    public static IObservable ObserveOn(this IObservable source, IScheduler scheduler);

    public static IObservable SubscribeOn(this IObservable source, IScheduler scheduler);
}

So let’s ObserveOn(ThreadPool), and we fix our problem !

WTF or ‘Why are my events still queueing’ ?

The essential thing to remember is Rx is not multithreaded. If you specify you want to Observe events on a particular thread, Rx will help you, but that doesn’t mean your main thread won’t block until that call returns. So what’s the point of ObserveOn and SubscribeOn ? It’s mostly useful for STA scenarios: most notably the one where a UI thread receives events, which you SubscribeOn ObserveOn a background thread to prevent blocking of the UI thread, and eventually ObserveOn the UI thread again to update the UI. Sure the case uses two threads, but it’s all sequential.

The real fix

Explictly spawn a new thread/task in the OnNext which takes care of the database update, and immediately return to the observable.