Reacting to Change With the Rx Framework

A new feature of .NET 4 (and already partially available in Silverlight 3) is the Reactive (Rx) Framework. It fundamentally changes the way we manage collections of data, moving from a “Pull” to a “Push” pattern. Lemme explain.

Traditionally when dealing with a list of objects (let’s say rows in a table) we use a “pull” pattern – pull all the rows from the database, and then deal with them on an item-by-item basis. This usually involves a data adapter of some sort connecting to a data source, filling a collection object with individual items representing each piece of data, and dealing with the output once the data fetching operation is complete. Depending on how the operation is done, this can lead to thread-blocking problems and unresponsiveness (i.e. UI hanging while fetching a report).

So how would a “push” pattern be different? Well, instead of requesting all the data in one big chunk and waiting for it to complete, we “subscribe” to a “feed” of data, and deal with each data item on an individual basis. This is done in a more asynchronous manner, allowing other operations to execute while we’re waiting on our data items. Think of it this way – let’s say every time a DataAdapter processes a row from a Select query, it fires an “RowProcessed” event that can have an event handler (i.e. your code) that would execute every time a row is retrieved. This is achieved using a new type of collection, IObservable, and it’s counterpart, IObserver. Example time!

Pull Method
1
2
3
4
5
6
7
8
9
10
11
12
SqlDataAdapter adapter = new SqlDataAdapter("SELECT Date, SubTotal 
  FROM Orders WHERE Date > '2009-01-01' ORDER BY Date", 
  "Data Source=localhost;Initial Catalog=NorthwindDB;");
DataTable orders = new DataTable();
//we have a lot of orders, and we're not doing this async, 
//so app will hang during this line of code
adapter.Fill(orders);
foreach(DataRow dr in orders.Rows)
{
    Console.WriteLine(string.Format("On {0}, customer bought {1:c}",
      dr["Date"], dr["SubTotal"]));
}
Push Method
1
2
3
4
5
6
7
8
9
10
11
12
//predefined LINQ-to-SQL context
DatabaseContext context = new DatabaseContext();
IObservable datasource = from o in context.Orders
                       where o.Date > new DateTime(2009, 1, 1)
                       order by o.Date select o;
//Subscribing to the IObservable will cause the passed-in 
//delegate to execute every time
//The IObservable receives a piece of data from the data source, 
//running on its own thread
datasource.Subscribe(order => Console.WriteLine(
  string.Format("On {0}, customer bought {1:c}",
      order.Date, order.SubTotal)));

Now, it’s not that async functions are new – you could’ve easily used SqlCommand.BeginExecuteReader in the above example – but they’ve traditionally been a pain in the ass to setup, with all the callbacks and whatnot. What IObservable gives us is an easily extensible way to provide and consume async data sources, and with virtually any data source you can think of. Examples I’ve seen include web service calls, database queries, even mouse events. And because it’s extended by LINQ, you can do cool stuff like:

Chained LINQ Operators
1
2
3
4
5
6
7
//order search results by result index, select the html page snippit, 
//concatenate it, and display it... ASYNC!
var concatenatedHtml = pageDownloads
    .OrderBy(pageDownload => pageDownload.ResultIndex)
    .Select(pageDownload => pageDownload.Html)
    .Aggregate((accumulatedHtml, html) => accumulatedHtml + html)
    .Subscribe(html => Debug.Write(html));
Observable Input Events
1
2
3
4
5
6
7
8
9
10
11
//custom mouse-drag event
public void SubscribeToDrag(Control control)
{
    IObservable[Event[MouseEventArgs]] draggingEvent =
        from mouseLeftDownEvent in control.GetMouseLeftDown()
        from mouseMoveEvent in
          control.GetMouseMove().Until(control.GetMouseLeftUp())
        select mouseMoveEvent;
    draggingEvent.Subscribe(event => Debug.Write(
      string.Format("Mouse Dragged Over {0}!", event.Source));
}

Comments