RX Support for DataPortal

RX Support for DataPortal

Old forum URL: forums.lhotka.net/forums/t/8846.aspx


GA30 posted on Tuesday, April 27, 2010

Hello All,

If any of you like what's happening with Rx (http://msdn.microsoft.com/en-us/devlabs/ee794896.aspx) and would like to utilize it in conjunction with the DataPortal, down below is a small little class (DataPortalExtensions) that contains a few extension methods that have enabled me to do so. Have you ever found yourself writing code sort of like this:

BOCollection1.BeginFetch(
1, (s1, e1) =>
                {
                    if (e1.Error != null)
                    {
                        throw e1.Error;
                    }

                    BOCollection2.BeginFetch(
                    2, (s2, e2) =>
                    {
                        if (e2.Error != null)
                        {
                            throw e2.Error;
                        }

                        BOCollection3.BeginFetch(
                        3, (s3, e3) =>
                        {
                            if (e3.Error != null)
                            {
                                throw e3.Error;
                            }

                            var col1 = e1.Object;
                            var col2 = e2.Object;
                            var col3 = e3.Object;
                        });
                    });
                });

Quite a bit of nesting here. Please disregard the error handling as this is not the point I am trying to stress. Instead I am emphasizing that often times in the async SL world you may end up deep in DataPortal calls. With Rx though you can write code like this instead:

(from dpr1 in BOCollection1.BeginFetch(1)
 where dpr1.AssertNoErrors()
 from dpr2 in BOCollection2.BeginFetch(2)
 where dpr2.AssertNoErrors()
 from dpr3 in BOCollection3.BeginFetch(3)
 where dpr3.AssertNoErrors()
 select new { r1 = dpr1, r2 = dpr2, r3 = dpr3 }).Subscribe(result =>
 {
   var col1 = result.r1.Object;
   var col2 = result.r2.Object;
   var col3 = result.r3.Object;
 });

As you can see with Rx you can at least reduce somewhat the nesting level. To me this code is a bit more readable than the one without RX. Obviously though tastes vary and in your eyes this is all may be completely unnecessary and rather makes things worst. Rx though is much more than just trying to write cleaner less nested code. I highly recommend checking it out (http://msdn.microsoft.com/en-us/devlabs/ee794896.aspx). In order to work with Rx in conjunction with the DataPortal, a few things are needed.

First, you need the Rx binaries. These can be found at the link provided above. Your .NET assemblies assemblies need reference to System.Reactive. Your SL assemblies needs reference to System.Observable, System.Reactive, and System.CoreEx.

Second, you need something that's going to convert the async data portal calls to IObservable. Here's a little class I am using that does so. I have this class in my .NET business assembly and also in my SL business assembly via linked file (typical).

public static class DataPortalExtensions
    {
        public static IObservable<DataPortalResult<TBusinessObject>> FetchToObservable<TBusinessObject>(
          this DataPortal<TBusinessObject> dataPortal)
          where TBusinessObject : Csla.Serialization.Mobile.IMobileObject
        {
            var asyncSubject = new AsyncSubject<DataPortalResult<TBusinessObject>>();

            dataPortal.FetchCompleted += (s, e) =>
            {
                asyncSubject.OnNext(e);
                asyncSubject.OnCompleted();
            };

            dataPortal.BeginFetch();

            return asyncSubject;
        }

        public static IObservable<DataPortalResult<TCommand>> ExecuteToObservable<TCommand>(
          this DataPortal<TCommand> dataPortal, TCommand command)
          where TCommand : Csla.CommandBase<TCommand>
        {
            var asyncSubject = new AsyncSubject<DataPortalResult<TCommand>>();

            dataPortal.ExecuteCompleted += (s, e) =>
            {
                asyncSubject.OnNext(e);
                asyncSubject.OnCompleted();
            };

            dataPortal.BeginExecute(command);

            return asyncSubject;
        }

        public static IObservable<DataPortalResult<TBusinessObject>> FetchToObservable<TBusinessObject>(
           this DataPortal<TBusinessObject> dataPortal,
           object criteria)
           where TBusinessObject : Csla.Serialization.Mobile.IMobileObject
        {
            var asyncSubject = new AsyncSubject<DataPortalResult<TBusinessObject>>();

            dataPortal.FetchCompleted += (s, e) =>
            {
                asyncSubject.OnNext(e);
                asyncSubject.OnCompleted();
            };

            dataPortal.BeginFetch(criteria);

            return asyncSubject;
        }

        public static IObservable<SavedEventArgs> SaveToObservable<TBusinessObject>(
           this TBusinessObject businessObject)
           where TBusinessObject : BusinessBase<TBusinessObject>
        {
            var asyncSubject = new AsyncSubject<SavedEventArgs>();

            businessObject.BeginSave((s, e) =>
            {
                asyncSubject.OnNext(e);
                asyncSubject.OnCompleted();
            });

            return asyncSubject;
        }

        public static IObservable<SavedEventArgs> SaveToObservable<TBusinessObject, TChildBusinessObject>(
           this TBusinessObject businessObject)
            where TChildBusinessObject : Csla.Core.IEditableBusinessObject
            where TBusinessObject : BusinessListBase<TBusinessObject, TChildBusinessObject>
        {
            var asyncSubject = new AsyncSubject<SavedEventArgs>();

            businessObject.BeginSave((s, e) =>
            {
                asyncSubject.OnNext(e);
                asyncSubject.OnCompleted();
            });

            return asyncSubject;
        }

        public static bool AssertNoErrors(this IDataPortalResult result)
        {
            if (result.Error != null)
            {
                throw result.Error;
            }

            return true;
        }

        public static bool AssertNoErrors(this IDataPortalResult[] results)
        {
            foreach (var result in results)
            {
                if (result.Error != null)
                {
                    throw result.Error;
                }
            }

            return true;
        }

        public static bool AssertNoErrors(this SavedEventArgs e)
        {
            if (e.Error != null)
            {
                throw e.Error;
            }

            return true;
        }
    }

Third and final, you have to make sure your business methods that invoke the DataPortal make use of the above extensions. What follows are two equivalent methods that fetch a collection. The first is the typical one without RX. The second is the one with RX that makes use of the above extension methods and enables LINQ over DataPortal events.

public static void BeginFetch(int userID, EventHandler<DataPortalResult<BOCollection1>> completed)
        {
            var dp = new DataPortal<BOCollection1>();
            dp.FetchCompleted += completed;
            dp.BeginFetch(new SingleCriteria<BOCollection1, int>(userID));
        }

        public static IObservable<DataPortalResult<BOCollection1>> BeginFetch(int userID)
        {
            return new DataPortal<BOCollection1>().FetchToObservable(new   SingleCriteria<BOCollection1, int>(userID));
        }

That's it.

AaronH replied on Tuesday, April 27, 2010

Wow, thanks for all of this.  I'm planning on utilizing Rx on the current project that I'm leading and you have saved me what looks to have been a considerable amount of time!

Thanks!

Calin replied on Wednesday, April 28, 2010

Hi,

Thank you for this post, very informative indeed.

One question, how will I use this with TDD ? I am struggling a lot with testing the code that uses CSLA as it is.

Regards,

Copyright (c) Marimer LLC