Reactive programming by example

All the tutorials in the world cannot replace hands-on experience, so once you’re done reading about reactive programming, go ahead and try it for yourself. A decent way to do that is to pick a task you’re otherwise familiar with, and see what that looks like when you add streams into the mix. Below, I will guide you through the task I’ve chosen for myself, to see how Rx would fare in a simple but realistic scenario.

Suppose you want to create a search page on your shiny Web 2.0 site, with the relevant results magically appearing on the screen as soon as the user types anything. Using our good old pal jQuery, your code might end up looking something like this1:

// Get data from the server
function search(filter) {
  return $.getJSON('api/search', filter);
}
// Show results on the screen
function display(result) {
  $('#output').text(result);
}
// Trigger a search whenever the input changes
$('#filter').on('input', function (event) {
  var filter = $(event.currentTarget).val();
  search(filter).then(display);
});

A search function that takes the query parameters and returns a promise2 for the results, a display function to get them on the screen, and a bit of setup code to wire things up to the input change event. Seems reasonable for the most part, right?

Wrong, of course. I wouldn’t have bothered with a rhetorical question otherwise. One problem here is that every single keystroke will fire off an HTTP request, creating a pointlessly large load on your server. Thankfully, the user does not care for the results of half-completed queries, so you can safely wait until they’ve stopped typing, and only annoy the server with your requests afterwards. A bigger problem that people will care about is if you display incorrect results. As it is, the code displays results in the order they arrived, which does not necessarily correspond to the order in which they were requested. Earlier searches, if they take long enough, will happily overwrite the results of later ones.

Fixing these issues is not trivial, and requires writing fairly convoluted code. Certainly, the code would be far more convoluted than the plain English description of the fixes. The core problem is that callbacks or promises alone can’t capture the concept of “things happening over time”. That is something that only manifests during run-time, expressed through control flow: what functions were called in which order, and in which state. Keeping track of all that is no easy task, getting it right without extensive debugging is even harder.

Streams make it possible to represent “things happening over time” as a symbol in your source code, which you can manipulate with simple, composable operators, just like you would do with anything else. Let’s get back to our search page example and see what that looks like with Rx.

var Observable = Rx.Observable;

// Convert to streams
function search(filter) {
  var result = $.getJSON('api/search', filter);
  return Observable.fromPromise(result);
}
var filters = Observable.fromEvent($('#filter'), 'input')
    .map(function (event) {
      return $(event.currentTarget).val();
    });
// Trigger a search whenever the input changes
filters
    .map(search)
    .mergeAll()
    .subscribe(display);

There’s a bit of wrapper code at the start, to convert everything to streams. That is straightforward enough, especially if you can use the built-in conversion methods like fromPromise and fromEvent. Going from the queries to the search results is the shorter part, but slightly scarier. The call to map here transforms a stream of strings into a stream of streams – a metastream3. Think of these as hierarchical streams, where each item represents the start of a child stream, that you can potentially manipulate further. In this case, mergeAll grabs everything that shows up in any of the child streams, and emits them in its output stream unaltered, thereby flattening the hierarchy by one level. The final subscribe operator is used to leave the realm of streams, and invoke a callback for each item4.

…And that’s it for replicating our existing code, with the existing problems. There would not be much of a point to that if it didn’t make maintenance easier, so let’s fix the previously mentioned bugs, and add some more features. Waiting until the user is done typing is accomplished via the debounce operator, which outputs an item only after its input stream has been silent for a while. The issue of out-of-order results can be tackled by using switchLatest instead of mergeAll, which only passes through items from the latest child stream (instead of all of them).

filters
    .debounce(300)
    .map(search)
    .switchLatest()
    .subscribe(display);

Right, that’s the bugs fixed. Moving on to the additional features. The first would be a loading indicator, to let the user know that something’s going on in the background. Normally you might try to track whether any requests are in progress via a counter, and then show or hide a spinner based on that. Using Rx, you can do this a lot more directly: have the stream returned by search itself indicate loading, by producing a special value at the start, in addition to the eventual result. A convenient5 solution is to use null for this special value, which can easily be handled later on.

// Search, with a loading event prepended
function search(filter) {
  var result = $.getJSON('api/search', filter);
  return Observable.fromPromise(result)
      .startWith(null);
}
// Convert 'loading' event to an ellipsis
function toText(result) {
  return result || '...';
}
filters
    .debounce(300)
    .map(search)
    .switchLatest()
    .map(toText)
    .subscribe(display);

For the end, let’s do something slightly more complicated. If you’re operating with anything but the smallest amounts of data, you’ll want to show your search results one page at a time. As a consequence of that, you’ll usually want to know how many pages there are in total. Add some extra parameters to search to indicate the page you want, and create a count function that does a second HTTP call to get the page count, all of that is straightforward. The tricky part is that as long as only the selected page changes (and the filter criteria don’t), there is no need to fire off a count query to the server. This means that you will need to coordinate two streams that produce events at different rates, while still being based on related inputs. There are a few different ways to do this incorrectly, all of which boil down to needlessly introducing concurrency in the wrong place. For now, I’ll just show a solution that works.

// Combine and convert results from two calls
function toText(count, data) {
  var countText = count || '...';
  var dataText = data || '...';
  return '#' + countText  + ' - ' + dataText;
}
// Create stream for selected page index
var pages = Observable.fromEvent($('#page'), 'input')
    .map(function (event) {
      var textValue = $(event.currentTarget).val();
      return parseInt(textValue, 10);
    })
    .shareValue(1);

The pages stream looks almost identical to filters, but notice the extra call to shareValue sneaked in at the end. You might be wondering what that does, and what the point of it is. To understand that, you need to know about hot and cold streams… and a few slight variations of those, actually. The difference is in how they treat their subscribers. Hot streams emit items whenever they want, regardless of who is or isn’t subscribed. If you subscribe too late and miss some of the initial items, too bad. Cold streams, on the other hand, produce a specific sequence of items, on demand, whenever someone subscribes. The behaviour we want from pages is different from either of those options: we need a lukewarm stream6, one that represents a value that changes over time. Whenever a subscriber is added, they get notified of the current value, and of any subsequent changes – which is exactly what the shareValue method does. Before getting into why that’s needed, here’s the rest of the code for the final version:

filters
    .debounce(300)
    .map(function (filter) {
      var counts = count(filter);
      var searches = pages
          .map(function (page) {
            return search(filter, page);
          })
          .switchLatest();
      return counts.combineLatest(searches, toText);
    })
    .switchLatest()
    .subscribe(display);

More metastreams, yay! This time, they’re used to group queries by the filter parameter, and to trigger only one count query in each group. The reason for shareValue is that when we transform the pages stream here, that should include the current value as well, not just the future changes. After all that is done, the latest values from the count and search calls are combined using the unsurprisingly named combineLatest method. The final destination is still the very same display call, to ensure that the result of our hard work actually makes it to the screen.

Even though this solution is still only a handful of lines, it avoid all the pitfalls you would typically encounter when dealing with asynchronous operations. That means you can spend less time on debugging complex code that’s doing a conceptually simple task, and more time on actually getting things done. If this has piqued your interest, you can go here to poke around in the live code of this example.

Anyway, that’s it for today. Next up: more JavaScript, and less reactive programming.


  1. Hopefully your code includes proper error handling and validation as well. This one’s just for illustrative purposes. 
  2. I know, jQuery’s promises are only slightly disguised callbacks. It’s fine, they’re going to get Rx’d in this example anyway. 
  3. Or an “observable sequence of observable sequences”, if you use the terminology in the Rx documentation. You can probably see why I don’t. 
  4. Again, error handling is ignored here, as is unsubscribing. 
  5. i.e. lazy. 
  6. Unlike the “hot” and “cold” terminology, this is not actually a thing. 
Advertisements

One thought on “Reactive programming by example

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s