RxLogo

Reactive Extension and ObserveOn

I’ve been actively following and using the work of the Reactive Extensions (Rx) team from early on, as Rx is truly a unique library for working with events. However, some days ago I discovered something didn’t quite work as expected, and it involves the ObserveOn and SubscribeOn methods of Observable.

The problem case

We had an eventstream – in particular XML messages arriving on a TCP port – which arrived at a relatively high rate. We did all of the event detection, filtering and handling with a Rx chain, which worked great. In the end, the event data had to be persisted using a database. This last step is where we meet the real problem, as the database operation could take longer than the time between arriving events causing queuing of the incoming events.

The solution (or so we naively thought)

Let’s put every database operation on a seperate thread so we offload all IO delays and free our main thread for the real computations. How ? There is this nice little method on Observable called ObserveOn which allows you to specify where you want the observing to take place:

public static class Observable
{
    public static IObservable ObserveOn(this IObservable source, IScheduler scheduler);

    public static IObservable SubscribeOn(this IObservable source, IScheduler scheduler);
}

So let’s ObserveOn(ThreadPool), and we fix our problem !

WTF or ‘Why are my events still queueing’ ?

The essential thing to remember is Rx is not multithreaded. If you specify you want to Observe events on a particular thread, Rx will help you, but that doesn’t mean your main thread won’t block until that call returns. So what’s the point of ObserveOn and SubscribeOn ? It’s mostly useful for STA scenarios: most notably the one where a UI thread receives events, which you SubscribeOn ObserveOn a background thread to prevent blocking of the UI thread, and eventually ObserveOn the UI thread again to update the UI. Sure the case uses two threads, but it’s all sequential.

The real fix

Explictly spawn a new thread/task in the OnNext which takes care of the database update, and immediately return to the observable.

Self-hosting ASP.NET MVC

Recently I was considering the technology to use for the GUI of a windows desktop client application I’m working on in my spare time. The standard picks are obviously WinForms or the more modern WPF. However, I have some problems with them in this case:

  • the technologies above are (windows) platform dependent, while for instance HTML5 isn’t.
  • the application I’m working on is a debugger. Perhaps details about that will follow in a future post, but I would like to be able to run it remotely from an arbitrary machine (and OS).
  • I really appreciate the power of modern javascript libraries such as knockout.js, and I can’t use those with WPF or WinForms.

Now, for Windows 8, the store applications can be developed in HTML5 already, while desktop apps can’t. Of course I could resort to creating a real web application (hosted in IIS), but that would require the debugger host machine to have IIS installed. The same is true for the rehostable web core.

There is an open source library – NancyFX – which can be self-hosted, and I was tempted to use that but it had some unknowns coming from the MVC experience: controllers are modules in Nancy, and I couldn’t quite find what existing functionality in MVC was or wasn’t available in the Nancy library.

So with all other options out of the window, I set out to self-host ASP.NET MVC4.

Surprisingly, much of the stuff that’s needed to do this is undocumented: while the MVC pipeline is well known, the internals of what happens between the TCP port and the first entry point in ASP.NET aren’t. However, logic dictates:

  1. there should be an open TCP/IP port listening somewhere
  2. there should be a dedicated app domain for the web application
  3. somehow the request received on the port should be transferred to the web application http runtime

1. Receiving requests

The traditional way of listening for web requests was to simply open a socket on port 80 using the Windows Socket API (WinSocks), and run the HTTP protocol stack on top of this in your app in user mode. However, this had severe drawbacks, which led to the creation of a dedicated kernel HTTP protocol stack – http.sys – which does all the low level HTTP work, and lies directly on top of the TCP/IP protocol stack.

In the user mode space, applications communicate with http.sys through the Windows HTTP Server API. IIS uses this API, and if we want to self-host ASP.NET, we will have to as well. Fortunately, the .NET framework includes a wrapper class around this API: System.Net.HttpListener

Using HttpListener to process requests should then be easy. I started by implementing a listener on top of HttpListener, and using Reactive Extensions to push the incoming request context to clients:

internal class Listener : IDisposable
{
    private HttpListener _listener = null;
    public IObservable IncomingRequests { get; private set; }

    internal Listener(int port)
    {
        _listener = new HttpListener();
        _listener.Prefixes.Add(string.Format("http://localhost:{0}/", port));
        _listener.AuthenticationSchemes = AuthenticationSchemes.Anonymous;
        _listener.Start();

        IncomingRequests = _listener.GetContextsAsObservable().ObserveOn(NewThreadScheduler.Default);
    }

    public void Dispose()
    {
        try
        {
            if (_listener == null) return;
            _listener.Stop();
            _listener = null;
        }
        catch (ObjectDisposedException)
        {
        }
    }
}

internal static class ListenerExtensions
{
    private static IEnumerable Listen(this HttpListener listener)
    {
        while (true)
        {
            yield return listener.GetContextAsync().ToObservable();
        }
    }
    internal static IObservable GetContextsAsObservable(this HttpListener listener)
    {
        return listener.Listen().Concat();
    }
}

Now all client code has to do is create a new listener, and subscribe to the requestcontext stream:

private Listener _listener;

private void SetupListener(int port)
{
    _log.InfoFormat("Setting up new httplistener on port {0}", port);
    _listener = new Listener(port);

    _log.InfoFormat("Start forwarding incoming requests to ASP.NET pipeline");
    _listener.IncomingRequests.Subscribe(
    (c) =>
    {
        try
        {
            ProcessRequest(c);
        }
        catch (Exception ex)
        {
            _log.Error("Exception processing request", ex);
        }
    },
    (ex) => _log.Error("Exception in request sequence", ex),
    () => _log.Info("HttpListener completed"));
    _log.Info("Completed httplistener setup");
}

The real magic happens in the yet to be implemented ProcessRequest method. However, before feeding requests to the ASP.NET pipeline, we first have to set that up in its own AppDomain. When hosting your project in IIS, your application is typically bound to a dedicated application pool (AppDomain). In theory the web application could be run in the already running default app domain, however, in this post I’ll try to stay as close as possible to the IIS approach.

System.Web contains the class ApplicationHost which allows you to do exactly what we want: create a new AppDomain, and specify a user supplied type (class) which should be instanced there to bootstrap the domain.

Again, in theory, you could use any class, but there are two things to keep in mind:

  • you want to communicate with the new AppDomain from your default AppDomain
  • you don’t want this bootstrapping instance to be garbage collected as it’s the root object in the new domain

The classic solution for the first issue is to use a class derived from MarshalByRefObject, as this will automagically enable remoting by RPC between your AppDomains. Another – more modern – option would be to use WCF, but I didn’t check if that works in practice yet.

The second issue is fixed by explicitly telling the remoting infrastructure the object’s lease is never expiring (which ties its actual lifetime to the lifetime of the new AppDomain).

The code below demonstrates this:

public class AppHost : MarshalByRefObject
{
    //factory method
    private static AppHost GetHost(string virtualDir, string physicalPath)
    {
        // Fix for strange CreateApplicationHost behavior (can only resolve assembly when in GAC or bin folder)
        if (!(physicalPath.EndsWith("\"))) physicalPath += "\";

        // Copy this hosting DLL into the /bin directory of the application
        var fileName = Assembly.GetExecutingAssembly().Location;
        try
        {
            var folderName = string.Format("{0}bin\", physicalPath);

            //create folder if it doesn't exist
            if (!Directory.Exists(folderName)) Directory.CreateDirectory(folderName);

            //copy file
            File.Copy(fileName, Path.Combine(folderName, Path.GetFileName(fileName)), true);

            //and all assemblies
            var pathToAppHost = Path.GetDirectoryName(fileName);
            foreach (var fn in Directory.EnumerateFiles(pathToAppHost, "*.dll", SearchOption.TopDirectoryOnly))
            {
                File.Copy(fn, Path.Combine(folderName, Path.GetFileName(fn)), true);
            }
        }
        catch { }

        return (AppHost)ApplicationHost.CreateApplicationHost(typeof(AppHost), virtualDir, physicalPath);
    }

    //set an infinite lease
    public override object InitializeLifetimeService()
    {
        return null;
    }
}

The code before the actual CreateApplicationHost call in the factory method requires an explanation: while the CreateApplicationHost is a convenient method, it’s hardwired to only search for assemblies in the bin folder relative to the physical path of the web project (or the GAC). Rick Strahl mentions this on his blog, and in fact if you check the framework reference sources, or inspect the assemblies with ILSPY you’ll discover a hardwired bin reference.

So for a quick fix, in the code above I just copy the relevant assemblies to that folder. A more elegant solution would be to do a bit more work and create a new AppDomain using AppDomain.CreateDomain(…) and tell it where to find assemblies yourself, or even better, override the AssemblyResolve eventhandler.

Next we want to create an instance of your web/MVC project’s main class, which is some class – in global.asax.cs – derived from HttpApplication, so we also need a method on AppHost which does this:

private HttpApplication _mvcApp;

private void HostMvcApp() where T: HttpApplication
{
    //usually IIS does this, but we need to serve static files ourselves
    HttpApplication.RegisterModule(typeof(StaticFileHandlerModule));

    _mvcApp = Activator.CreateInstance();
}

This will bootstrap your MVC application and call into it’s Application_Start method where you can register routes, areas, bundles, etc as usual.

Since the ASP.NET pipeline does not serve static files, we need to do this ourselves. I do this here by registering a StaticFileHandlerModule, which we’ll implement in a future post.

So now we have hosted the MVC application and set up a HTTP listener. The only thing that’s left is to connect these two together so the ASP.NET pipeline will handle these requests, an undertaking which turns out to be more complex than it sounds.

The way in which requests should be handed to the HttpRuntime is through a call to the static ProcessRequest method:

public static void ProcessRequest(HttpWorkerRequest wr)

The problem is: HttpWorkerRequest is abstract and has over 70 methods that should be implemented to be able to pass every possible request to the ASP.NET pipeline (the HttpRuntime will call into those methods to find details about the request).

So whatever host calls into the HttpRuntime will have to provide its own implementation of HttpWorkerRequest for wrapping its http requests.

We can check those who did by searching derived types:

httpworkerrequest

All except the SimpleWorkerRequest are IIS specific implementations, which tightly integrate with the proprietary and native code IIS engine. The SimpleWorkerRequest itself is an implementation which when used to wrap HTTP requests allows you to send simple requests into the ASP.NET pipeline. However, that’s about it: the concept of streams, certificates (security), completely misses, so it’s not much more than a proof of concept: it won’t enable you to unleash the full power of the ASP.NET engine.

So we’re out of luck, we have to make our own implementation. An (incomplete) example can be found in an old msdn magazine.

Wrapping the request and sending it into the pipeline then looks like this:

private void ProcessRequest(HttpListenerContext context)
{
   _log.DebugFormat("Processing request");
   var wr = new HttpListenerWorkerRequest(context, VPath, PPath);
   HttpContext.Current = new HttpContext(wr);

   HttpRuntime.ProcessRequest(wr);
   _log.DebugFormat("Finished processing request");
}

But it fails at runtime…

It turns out we run into another problem: since the request is received in the default AppDomain, and processed in the ASP.NET AppDomain, we have to pass either the HttpListenerContext – like we did above – or our implementation of the HttpWorkerRequest between AppDomains. No matter how you do this (MarshalByRefObject, WCF), it requires serialization. And guess what ? None of these is easily serializable, for one because they contain the request and response streams…

At this point I decided it would save me a lot of work if I moved the Listener we created into the ASP.NET AppDomain as well, and do away with the serialization mess.

So ultimately, I ended up with a factory method which:

  1. creates a new appdomain and hosts my implementation of HttpApplication
  2. sets up a HttpListener in this new AppDomain
  3. wraps the HttpListenerContext in a custom implementation of HttpWorkerRequest
  4. sends it into the HttpRuntime

And finally…it works…I managed to have a fully functioning MVC application hosted in-process with my main application.

There are some loose ends to tie up though, like serving static files. I may touch on that in a follow up, but if you want to try it yourself, try the hostasp NuGet package incorporating these concepts right now.

UPDATE: or try the source code