.NET Reactive Extensions Rx - 1 (Basics)

In this blog, I will explore the reactive extensions (Rx) framework. Rx extends .NET. Rx extends LINQ so that results of a query can be converted to an observable sequence. Let's explore more..
You can download Rx from the MSDN website. Once the msi is run, you can add references to System.Reactive dll to get started. Since Rx is heavily based on Observer pattern - let's take a look at what this pattern is -
Observer Pattern
I always find it easy to understand a pattern with an example and the example of bulb-switch is used commonly to describe the Observer pattern. In a bulb-switch example, the bulb is an observer as it is observing the switch. Whenever the state of switch changes, bulb has to do something. The switch in this case is called a subject. Rx defines 2 interfaces that describe the behavior of observer and switch. For observer, Rx has IObserver interface which defines OnNext(), OnCompleted() and OnError(Exception ex) methods. For subject, Rx has IObservable interface which has one method called Subscribe(IObserver) which takes as input an IObserver. 
Let's take a look at some very simple code that uses Rx framework:
class Program
        static void Main(string[] args)
            var numbers = new List<int> {1, 2, 0, 3, 4, 5, 6, 7, 8, 9, 10};
            var query = numbers.Take(5);
            // simple way
            foreach(var digit in query)

            // oQuery is the subject (switch) and Print is the observer (bulb)
            var oQuery = query.ToObservable();

            // demonstrating an OnComplete scenario
            oQuery = query.ToObservable();
            oQuery.Subscribe(Print, EndMethod);

            // demonstrating an error scenario
            oQuery = query.ToObservable().Select(n => 10 / n);
            oQuery.Subscribe(Print, ErrorMethod, EndMethod);

        static void Print(int i)

        static void EndMethod()
            Console.WriteLine("Task is complete");
        static void ErrorMethod(Exception ex)
and when I run this app, these are the results that I get -
The foreach loop demonstrates a typical way in which we are used to do the things. We loop through each number and then we do something with that number and in the end get out of that code block.
In the next code piece, we start to use the Rx framework. As mentioned earlier, we need an IObserver and an IObservable to work with. We convert the query to IObservable by calling the ToObservable method. This method converts IEnumerables to IObservables. Once the query is converted to an IObservable, we call the Subscribe() method on it. In the Subscribe() method we have to provide an IObserver. We don't really need to provide an IObserver but instead we can provide the 3 methods that IObserver implements, i.e, OnNext(), OnCompleted() and OnError(). In my code, I have 3 code pieces using different overloads. In the first one, I just pass a delegate for OnNext. In the second one, I pass OnNext() and OnCompleted() and in the third one, I pass all the 3. The rule to remember here is that OnNext() will be called one or more times depending on the number of items. OnError will get called if there is an error. If OnError is called, OnCompleted() won't be called and if OnCompleted() is called that means OnError() was not called and the operation was successful. OnNext() will never get called after either OnError() or OnCompleted() is called.
You might wonder as to what is the big deal about this Rx framework since all that we did above can be easily done using the foreach loop and try-catches. The real power of Rx comes into play when we start to utilize it in asynchronous scenarios, which will be the topic of my next blog.

No comments: