.NET Reactive Extensions Rx - 2 (Concurrency)

In this blog, I will further explore Rx framework and specifically look into the concurrency and threading enhancements because I think this is where most of the value for Rx is.

Rx provides a very fine-grained control to the developers to choose on what kind of threads they want to run their subscriptions and observations. We can say that subscriptions are basically the calls to get the next item in the observable collection and observations are what we do with that item once we have got it. So, if we are iterating through a collection of numbers, subscription means getting that number (through the query) and observation means doing the OnNext, OnError and OnCompleted operations on that number.  Keep in mind that even if we use different threads, Rx guarantees that the order is maintained. That is, if the observable collection contains 1,2,3 and you do some Rx operations on this, it will be processed in the same order. Meaning that, operation on 2 wont be called until 1 is completed and so on and so forth. The only caveat to this is that Rx only guarantees the order of observations and doesn't guarantee the order for subscriptions. To demonstrate threading scenarios, I have created a simple WPF app with a bunch of buttons that display some text in the underlying textbox. Since, there are cases in which I wanted to do the observation operations in different thread, it was not possible for me to append the text to textbox (as textbox is in the main thread). In that case I output my text to Debug->Output window by using the PrintToDebug() method. Let's take a look at the code and its results -
public partial class MainWindow : Window
    {
        public MainWindow()
        {
            InitializeComponent();
            Example8Stop.IsEnabled = false;
        }

        private string PrintThreadId()
        {
            return "Thread:" + Thread.CurrentThread.ManagedThreadId;
        }

        private void CleanTB()
        {
            tb.Text = "Main " + PrintThreadId() + "\n";
        }

        private void Print(string str)
        {
            tb.AppendText(str + "\nObservation\tPrint\t\t" + PrintThreadId() +"\n");
        }

        private void PrintToDebug(string str)
        {
            Debug.WriteLine(str + "\nObservation\t\tPrint\t\t" + PrintThreadId() + "\n");
        }

        private void EndMethod()
        {
            tb.AppendText("\nEndMethod\t" + PrintThreadId() + "\tTask is complete\n");
        }

        private void ErrorMethod(Exception ex)
        {
            tb.AppendText("\nErrorMethod" + PrintThreadId() + "\t" + ex.Message + "\n");
        }

        private void Example1_Click(object sender, RoutedEventArgs e)
        {
            CleanTB();
            tb.AppendText("\nDemonstrating Scheduler.NewThread. See output in the debug-output window.");
            var numbers = new List<int> { 1, 2, 3 };
            var query = numbers.Select(n => SlowDown(n));

            var oQuery = query.ToObservable(Scheduler.NewThread);
            oQuery.Subscribe(PrintToDebug);
        }
       
        private void Example2_Click(object sender, RoutedEventArgs e)
        {
            CleanTB();
            tb.AppendText("\nDemonstrating SubscribeOn On NewThread.");

            var numbers = new List<int> { 1, 2, 3 };
            var query = numbers.Select(n => SlowDown(n));

            var oQuery = query.ToObservable();
            oQuery.SubscribeOn(Scheduler.NewThread).Subscribe(PrintToDebug); 
        }

        private void Example3_Click(object sender, RoutedEventArgs e)
        {
            CleanTB();
            tb.AppendText("\nDemonstrating ObserveOn on NewThread.");

            var numbers = new List<int> { 1, 2, 3 };
            var query = numbers.Select(n => SlowDown(n));

            var oQuery = query.ToObservable();
            oQuery.ObserveOn(Scheduler.NewThread).Subscribe(PrintToDebug);
        }

        private void Example4_Click(object sender, RoutedEventArgs e)
        {
            CleanTB();
            tb.AppendText("\nDemonstrating SubscribeOn and ObserveOn on NewThread.");

            var numbers = new List<int> { 1, 2, 3 };
            var query = numbers.Select(n => SlowDown(n));

            var oQuery = query.ToObservable();
            oQuery.SubscribeOn(Scheduler.NewThread).ObserveOn(Scheduler.NewThread).Subscribe(PrintToDebug);
        }

        private void Example5_Click(object sender, RoutedEventArgs e)
        {
            CleanTB();
            tb.AppendText("\nDemonstrating SubscribeOn and ObserveOn on ThreadPool.");

            var numbers = new List<int> { 1, 2, 3 };
            var query = numbers.Select(n => SlowDown(n));

            var oQuery = query.ToObservable();
            oQuery.SubscribeOn(Scheduler.ThreadPool).ObserveOn(Scheduler.ThreadPool).Subscribe(PrintToDebug);
        }

        private void Example6_Click(object sender, RoutedEventArgs e)
        {
            CleanTB();
            tb.AppendText("\nDemonstrating SubscribeOn NewThread and ObserveOnDispatcher.");

            var numbers = new List<int> { 1, 2, 3 };
            var query = numbers.Select(n => SlowDown(n));

            var oQuery = query.ToObservable();
            oQuery.SubscribeOn(Scheduler.NewThread).ObserveOnDispatcher().Subscribe(Print);
        }

        private void Example7_Click(object sender, RoutedEventArgs e)
        {
            CleanTB();
            tb.AppendText("\nDemonstrating SubscribeOnDispatcher and ObserverOn NewThread.");

            var numbers = new List<int> { 1, 2, 3 };
            var query = numbers.Select(n => SlowDown(n));

            var oQuery = query.ToObservable();
            oQuery.SubscribeOnDispatcher().ObserveOn(Scheduler.NewThread).Subscribe(PrintToDebug);
        }
        
        private void Example8_Click(object sender, RoutedEventArgs e)
        {
            Example8.IsEnabled = false;
            Example8Stop.IsEnabled = true;
            CleanTB();
            tb.AppendText("\nDemonstrating Finally and disposing the subscription");

            var numbers = Enumerable.Range(1, 100);
            var query = numbers.Select(n => SlowDown(n));

            var oQuery = query.ToObservable()
                .SubscribeOn(Scheduler.NewThread)
                .ObserveOnDispatcher()
                .Finally(FinalAction);
            m_sub = oQuery.Subscribe(Print);
        }

        private void Example8Stop_Click(object sender, RoutedEventArgs e)
        {
            m_sub.Dispose();
        }

        private void FinalAction()
        {
            tb.AppendText("\nFinal Action\t" + PrintThreadId());
            Example8.IsEnabled = true;
            Example8Stop.IsEnabled = false;
        }

        private string SlowDown(int n)
        {
            Thread.Sleep(500);
            return "\nSubscription\tSlowDown\t" + PrintThreadId() + "\t\tValue:" + n;
        }

        private IDisposable m_sub;
    }
As you can see in this code SlowDown() method is part of subscription as it is part of the query that iterates to get the next item while Print() and PrintToDebug() methods are part of observations as they are the OnNext delegates. Example1
In Example1, when we provide the Scheduler.NewThread argument to the ToObservable() method, it means that all the subscriptions (getting next items) should be performed on a new thread. So as you can see in the application that main thread id is 8 but subscriptions ran on thread 10. Since we didnot provide any such parameter for observations, they also ran on thread 10.  Example2
Example2 does the same thing as Example1. The only difference is that instead of providing Scheduler.NewThread argument to the ToObservable() method, we are using the SubscribeOn(Scheduler.NewThread) to achieve the same result. Example3
In Example3, we utilize the ObserveOn() method. This tells the Rx framework that all observations (in our case the print methods) should run on a new thread. As you can see, the main thread is 10 and all subscriptions ran on thread 10 (since we didn't provide anything for subscriptions) while all observations ran on different new threads. Scheduler.NewThread for subscriptions means that run the subscription on any other thread other than the main thread. While, Scheduler.NewThread for observations means that run the observations on a completely new thread. Example4
In Example4, we use both the SubscribeOn and ObserveOn methods. And as i said before that for subscriptions NewThread just means a different thread from the main thread while for observations, it means a completely NewThread, we can see the same in the results. All subscriptions ran on thread 13 which is different from thread 10 (main thread) while all the observations ran on a completely different new thread. Example5
Example5 is similar to Example4 and the only difference is that instead of NewThread we are using ThreadPool. This means that instead of creating new threads, available and free threads from the pool will be used. The other difference between them is that NewThread creates the thread in foreground while ThreadPool threads are in the background. Foreground threads will make the main app wait until all their tasks are done while background threads will not execute if the main thread has finished its work. Since this time we used the ThreadPool, all observations ran on thread 11 which was available and free. If you run this code for large numbers you will see different threads being used again and again as and when they became free. Example6
In example6, we are using subscribe on new thread but ObserveOnDispatcher(). This means that all observations should run on the dispatcher thread which is the main thread. It is for this reason, that we are able to print the results in this window rather than in the debug window. As you can see in the results, due to NewThread on subscriptions, they ran on a different thread (7) than main thread (9) while all the observations ran on the main thread (9). Example7
In example7, we use the SubscribeOnDispatcher() while run the observations on a new thread. The results show that all subscriptions ran on the same thread (9) as the main thread while all observations ran on the new thread. Example8
Finally, example 8 adds a few more important points the the example to make it complete. First thing to notice is that we have added a corresponding Ex 8 Stop button for Ex 8 button. This stop button allows us to stop the execution midway. To do this, first we declare a local IDisposable variable to keep its value outside the scope of the function. Then, when Ex 8 Stop is clicked, we just call the Dispose() on this variable, as shown in the click handler of Ex 8 Stop. The other thing to note here is that to perform some kind of cleanup, we have added a Finally method. This Finally method is called even if the execution is stopped or an error occurs or the execution completes successfully. For this reason, we couldn't use the OnCompleted to perform cleanup because OnCompleted will not get called if the execution is stopped mid-way or if an error occurs.

There are few other helpful methods, which I won't go in detail, but they are useful. We can use Buffer and Window methods in cases where we need to work with multiple items rather than single item. So for ex, we can specify in the buffer method a count of 10. Then, in the subscription, instead of getting one item, we will get a list of 10 items. Both these methods also allow us to provide a timespan instead of a count. If we provide timespan than the buffer will contain as many items as it was able to fetch in that timespan. Buffer and Window work similarly and they only differ in their return type.

No comments: