If you came here before, you might have heard of a little something called reactive programming. The concept is simple: react to changes as they happen instead of checking for updated data on a schedule. It’s a great idea, for which you can even find a manifesto if you look hard enough. When you design reactive code correctly, you get great performance, maintainability and composability which are all very desirable.

Reactive Extensions (Rx for short) sells itself as the absolute remedy (pun intended) to all of your reactive problems. It is a very well though-out abstraction to represent changing state in your code. It respects all the core principles of the reactive manifesto and thus provides great performance, maintainability and composability and blah blah blah you get the idea.

If you used it in the past, either through Angular which is heavily dependent on Rx, or through .NET you know that presenting reactive content can be a challenge. After all, this reactivity comes from a trigger which might be clicks, network calls, AMQP messages or others; all of which can fail in a whole bunch of different manners. Then, there’s the question of delay. What do you between events? Will you even get an event? What if you get more events than expected? Or less? Or… none? These are not questions that Rx can answer. It will make sure that you are able to handle all of those cases, but it won’t tell you how or why you should do it.

In fact, I do not know of any asynchronous processing framework that will.

Observables

Reactive data, like an Rx observable, is a contract. The C# way of representing this contract is an interface. The same is true in TypeScript for RxJs and other OO languages. Unfortunately for us, this contract is vague; only a single method:

```
interface IObservable {
    IDisposable Subscribe(Action<TNext> onNext, Action<Exception> onError, Action onComplete);
}
```

Let’s see what we can learn from it.

  1. You can subscribe to an IObservable.
  2. Subscribing, in broader terms, means that you will be notified of something in exchange for a small performance cost.
  3. The observable is the source of those notifications.
  4. You will be notified via callbacks, also called observers.
  5. There are 3 kinds of callbacks, thus 3 kinds of notifications.
    1. Next
    2. Error
    3. Complete
  6. A subscription can be disposed.
  7. Disposing of an IDisposable usually means to stop doing something or release a shared resource.
  8. Since performance is a shared resource, disposing in this case means unsubscribing from the observable and regaining our performance investment.

That is a lot of information packed in a single method, but it is still not enough to really understand how this entire machinery works. For instance, there is no mention of the order those notifications might come in. Can we get a next after an error? What happens when we complete? These questions are answered in details by the Reactive Extensions’ documentation, but the contract does not necessarily enforces those answers.

Let me add a couple more points to this contract based on what we can find in the documentation:

  1. Observables expect meaningful values (content) to be sent through Next notifications.
  2. A Next notification will be sent every time the source produce a new value.
    1. An HTTP request that comes back with a response.
    2. A button that is clicked.
    3. A stock ticker that updates.
  3. Observables expect errors (exceptions) to be sent through Error notifications.
    1. You can send exceptions through next if you want, but this is not the idiomatic way and is not expected by observers.
  4. An Error notification will be sent to notify when an error state was reached upstream.
    1. An exception while processing notification will trigger an error.
    2. A timeout or anything that deviate from the expected flow should trigger an error.
  5. An Error notification is a final notification in a stream.
  6. Observables expect a final notification (signal/unit) to be sent though Complete notifications.
  7. A Complete notification will be sent to notify when a source is no longer useful or as completed successfully.
    1. A fulfilled HTTP request should notify listeners that there will be no more data coming through.
    2. A disconnected device should notify listeners that it is no longer relevant.
    3. A user navigating to an other page might complete a timer.
  8. Both Error and Complete notification will automatically cause observers to unsubscribe.

Why

So, why should you care about all of this? Well, to put it bluntly: it is part of the contract.

This contract, these expectations, are what makes Rx thrive and work its best. By respecting this contract, you can be sure that every time something goes wrong, you can use tools like Catch() or Retry() to handle the problem. They will work even if you don’t know the cause. You can use Complete to free resources and cancel long-running tasks, even if they are handled by third-party code. All of this works because every one expect this implicit contract to be respected.

Ultimately, it means that you can also profit from these rules. This is where the idea of an Async-Value-Presenter comes in.

How

While observables are a great tool to move data around your application, you still need an adaptive layer with third-party code to convert low-level events into observables. Libraries lik Rx gives you some great tool to do this like the Observable.FromEvent method. Unfortunately, their only tool to go the other way around, from observable to stateful code like a UI, is the Subscribe method from before.

Introducing the Async Value Presenter, or AVP for short. This pattern was, as far as I can tell, coined by Jerome Laban; an amazing Montreal developer and ex MVP. The idea is really simple, and thus absolutely brilliant:

What if we could map an observable to a state machine?

A state machine is a device (in our case a software construct) that can be used to express a series of states and transitions in a stateful manner. A well designed state machine will force its user to handle all possible states. When this is done right, they can even do it at compile time.

A simple final state machine might look a bit like this:

  1. Sleeping (initial state)
    1. Can become Awake via the Alarm Clock transition
  2. Awake
    1. Can become Productive via the Drinking Coffee transition
    2. Can become Sleeping via Lazy transition
  3. Productive (final state)

It is called final because it includes a final state which is not a requirement of all state machines. Here is a crude equivalent of this state machine written in C#:

```
class Developer {
    public enum DeveloperState { Sleeping, Awake, Productive }

    private class Transition {
        public DeveloperState SourceState { get; set; }
        public DeveloperState TargetState { get; set; }

        public DeveloperState TryExecute(DeveloperState currentState) =>
            currentState == SourceState
                ? TargetState
                : currentState;
    }

    public DeveloperState CurrentState { get; private set; } = DeveloperState.Sleeping;

    private Transition RingAlarmClockTransition = new Transition {
        SourceState = DeveloperState.Sleeping,
        TargetState = DeveloperState.Awake
    }

    private Transition DrinkCoffeeTransition = new Transition {
        SourceState = DeveloperState.Awake,
        TargetState = DeveloperState.Productive
    }

    private Transition BeLazyTransition = new Transition {
        SourceState = DeveloperState.Awake,
        TargetState = DeveloperState.Sleeping
    }

    public RingAlarmClock() => CurrentState = RingAlarmClockTransition.TryExecute(CurrentState);
    public DrinkCoffee() => CurrentState = DrinkCoffeeTransition.TryExecute(CurrentState);
    public BeLazy() => CurrentState = BeLazyTransition.TryExecute(CurrentState);
}
```

In this case, we have a set of predefined Transitions acting on a CurrentState. They will only let a few select state changes go through, with all other cases keeping the state unchanged. All possible states are defined by the DeveloperState enum. You can also see that an external force still needs to trigger the state changes.

Let’s try to think of a Reactive Extension observable as a state machine then! In our case, the mechanisms that will trigger state changes is the arrival of a new notification. In other words, our trigger will be the Subscribe method itself, which ideally, the machine should take care of by itself. Designing our state machine well will:

  • Make it impossible to forget to handle errors.
  • Make it a conscious decision to not provide feedback when loading data.
  • Force you to think about changing data and paging.
  • Impose the reality of missing data.

In other words: It. Makes. Code. Provably. Correct.

That is a big deal.

So big that I implemented this pattern in pretty much every sensible project that I worked on since I learned about it. It saved me many days of debugging and weeks of designing UX. I can only understate how useful compile-time-defined state machines are. They should be used way more frequently.

The Machine

The first state in our state machine represents the lack of a subscription. At this point, the machine did not call the Subscribe() method on the observable.

  • Pristine

The next step is loading. Here we just called Subscribe(). We have yet to receive any notifications.

  • Pristine
  • Loading

Then, any of the three notifications can happen. This will be represented by three new possible states.

  • Pristine
  • Loading
    • Streaming
    • Empty
    • Error

Getting a Next notification means that we got our first bit of meaningful data. The streaming state represents that we are in the process of getting meaningful notifications. We might still get more. We simply don’t known. This might be used to transfer chunks of a video, or pages of results for a search query.

Getting the Complete notification means that the source will no longer push more notifications. The empty state represents that we haven’t received anything during the lifetime of our subscription. We will definitely not get any more at this point. This is a dead end.

Getting the Error notification means that the source encountered an error and must stop sending us more notifications. The error state represents that error. We will not get any more notifications in the future. This is also, a dead end.

At this point, if we are in the streaming state, our three familiar callbacks can still get triggered.

  • Pristine
  • Loading
    • Streaming
      • Streaming
      • Content
      • Error
    • Empty
    • Error

Getting an other Next notification means that we are still streaming more data. Thus, we stay in the streaming state.

Getting a Complete notification at this stage changes a little bit from last time and produces a new state. Here, the content state represent that we have received all of the data that we were expecting. This is a dead end, but also our happy path.

Getting an Error notification means the same as before. We are in the error state. Something bad happened upstream and we must stop at this point. This is the last dead end. The previous data that we received is still good and might be useful, though.

And lastly, we can see a pattern emerge in the possible state transitions…

  • Pristine
  • Loading
    • Streaming
      • Streaming
      • Content
      • Error
    • Empty
    • Error

As long as we get more values in the Next callback, we stay in the streaming state. Any other case will stop the state machine.

More state machines?

This state machine is only one of the way to interpret the various life stages of a Reactive Extension observable. It is the idiomatic interpretation. Sometimes, you will have to deal with non-idiomatic libraries, like NgRx (a Redux implementation for Angular), which insist on reducing errors to a Next notification so that it can be stored in their global state. We also all had to deal with APIs that return errors as HTTP 200 status codes. The simplest way to deal with those is to look for the request’s content and throw an actual exception when we detect something that looks like an error.

Still, it doesn’t mean that we cannot adapt our state machine to these patterns, or vice versa. I would be surprised to see more states added to the list, but I have seen cases where less states are definitely possible. In all cases tough, you generally want to redefine state transitions. For instance, you might match your states on a state variable in your Redux store with values such as 'loading' or 'done'. If you’re more adventurous, you might even write a compatibility layer around those frisky HTTP requests and expose them as proper error notifications.

The Presenter in AVP

A state machine isn’t the only part of the AVP pattern though. You still need to take that state and display it somehow. Exactly how this will be accomplished heavily depends on where you attempt to present the state of the observable. If it’s in an Angular application, you might use an NgTemplateOutlet to swap between configurable templates. In a VueJS application, this might be done through the slot mechanism. Again, the idea is to convert a stream back into a meaningful state that can be used by stateful code instead of stream processing code.

But I don’t like Reactive Extensions…

Fine, here’s a simple AVP using promises, just for you:

```
function CreateAsyncValuePresenter(
    pristineRenderer,
    loadingRenderer,
    contentRenderer,
    emptyRenderer,
    errorRenderer
) {
    let state = pristineRenderer();
    
    const render = () => state;
    const setSource = (promiseFactory) => {
        output = loadingRenderer();
        promiseFactory()
            .then(
                v => state = v ? contentRenderer(v) : emptyRenderer(),
                e => state = errorRenderer(e)
            );
    };

    return {render, setSource};
}

const anAvp = CreateAsyncValuePresenter(
    () => "<pristine>",
    () => "<loading>",
    v => "<content>:" + v,
    () => "<empty>",
    e => "<error>:" + e
);
console.log(anAvp.render()); // <pristine>
anAvp.setSource(() => fetch("https://example.com"));
console.log(anAvp.render()); // <loading>
setTimeout(() => console.log(anAvp.render()), 30_000); // <error>:invalid CORS
```

As you can see, the idea of an Async Value Presenter is very powerful pattern. It is the idea of turning a source of notification into a state machine. It works on any type of push-based data source, in any language, in any framework. It is not a complicated pattern either… well, in the end it will really depend on how much your favorite language/framework loves state machines and data streams.

Note: Please don’t use this example in production. It is terribly over simplified and doesn’t provide much value. Usually, you will want to turn this pattern into a component, like an Angular, React or VueJS component or a control, if you live in the native app world like in WPF or UWP. You might also want to include fancy debugging tools to impose a specific state that might be hard to replicate. (Testing loading screens anyone?) You might also consider splitting the state machine from the presentation component if you deal with multiple asynchronous sources in the same codebase.

Anyway, that was all I had to say for today. Look at how this pattern might simplify how you deal with asynchronous data; be it promises, observables or other-wise. Consider implementing your own!

Have a good day and read you later!