.NET Reactive Extensions Rx - 3 (Cleanup)

In this blog, I will create a small app to describe how to perform cleanup after the Rx subscription has finished its work.
Let's take a look at the code first - 
    public class A03CleanupEx
    {
        internal static void Run()
        {
            Console.WriteLine("Thread: {0}\tMain application thread", Thread.CurrentThread.ManagedThreadId);
            var oSeq = Observable.Using<char, MyStreamReader>(
                () => new MyStreamReader(new FileStream("chars.txt", FileMode.Open)),
                sr => sr.ReadToEnd().ToCharArray().Select(c=>c).ToObservable(Scheduler.NewThread).Finally(FinalStep)
                );

            oSeq.Subscribe(Print);
        }

        private static void FinalStep()
        {
            Console.WriteLine("Thread: {0}\tFinally getting executed", Thread.CurrentThread.ManagedThreadId);
        }

        private static void Print(char c)
        {
            Console.WriteLine("Thread: {0}\t{1}", Thread.CurrentThread.ManagedThreadId,c);
        }
    }

    internal class MyStreamReader : StreamReader
    {
        public MyStreamReader(Stream fs) :base(fs)
        {
            Console.WriteLine("Thread: {0}\tStream Initialized", Thread.CurrentThread.ManagedThreadId);
        }

        protected override void Dispose(bool disposing)
        {
            Console.WriteLine("Thread: {0}\tStream Disposed", Thread.CurrentThread.ManagedThreadId);
            base.Dispose(disposing);
        }
    }
and here are the results -

Rx provides 2 ways of performing cleanup - using and finally. It is a good practice to cleanup the resources you might have used once you are done so that the streams are closed, the threads are returned back to thread pool etc. 

Using
Rx framework provides Observable.Using to achieve this. We templatize the Using by providing first the type that will be the output once the subscription is run. In our case it is char. The second type is the type that you want to dispose once the operation is over. In our case it is MyStreamReader. I created MyStreamReader class just to tap into the constructor and the dispose method so that I can display some message on the console. Other than that, this class is the same as StreamReader. The Using method takes 2 arguments and both of them are delegates. The first delegate's output must be an object that you want to dispose. In our case, its type would be MyStreamReader. The second delegate than takes this object (sr) and should output an IObservable<T> where T in our case is char. The using method itself outputs an IObservable<T> where T in our case is char. This var can now be used to subscribe.

Finally
In my previous blog also, I touched a little bit about the Finally method. Finally method is similar in nature to our common try-catch-finally constructs in that the code in Finally will always get executed, irrespective of whether the operation is successful or an error occurred. As you can see in the results, after the subscription was over, Finally was called and once Finally was finished, Dispose was called to dispose the MyStreamReader object.

No comments: