Posts Tagged ‘Rx’
Making Async calls in Silverlight using Reactive Extension (Rx)
In this blog I am going to explain briefly about how to make async calls using Reactive Extension (Rx). Every one knows that Silverlight is one of the best platform to create RIA’s. One of the architecture constrain of Silverlight is, it wont directly interact with backend DB. So we need to have a webservice to perform the db operations. Silverlight can consume WCF or asmx service, but their is a catch silverlight can only communicate to webservices asynchronously. We all know that async calls have several advantages. One key feature of async call is, it wont block the UI while performing the call. But one downside in async programming is the coding pattern involved in placing async calls. When I place an async calls, I use an anonymous delegate approach or use lambda expression. But some how I am not satisfied with any of these approaches, I feel my code is not clean. So we always look for a better approach and our long waiting is over with the introduction of Reactive Extension (Rx) from MS labs.
You can visit Rx web site to get more details. I am not a person to explain the architecture or indepth details of Rx instead I am going to show how it benefit me in my async programing. I am going to rewrite one of my application I wrote to upload images using Silverlight and WCF, you can get more details of that app from my blog. I will rewrite only the part where I make async calls to server to get images from WCF service. Before get into refactoring, We need to download and install the Rx libraries from Rx web site.
Let’s refactor the code to make our async calls using Rx. We need to add couple of assembly references to add Rx to Silverlight, below are those assemblies.
- System.Observables
- System.CoreEx
- System.Reactive
I demonstrated two ways of interacting with WCF service in the source code I uploaded. A proxy and proxy less approach. We all know the proxy approach, the communication to the service using the class generated by VS. In one of my post I provided some insight of Proxy less approach, you can check it out here.
If you have my source code in hand, have a look at the ImageListViewModel.cs in Silverlight app. You can see how I am making the async call.
In proxy less approach I use lambda expression to make the call.
IImageServiceAsync imgService = ServiceChannelProvider.CreateChannel<IImageServiceAsync>(); imgService.BeginGetAllImage(save => { try { imgs = imgService.EndGetAllImage(save); this.AddToUserLogoList(imgs); } catch (Exception ex) { throw; } }, imgService);
In proxy approach I used a event driven approach as shown below.
ImageServiceClient imgClient = new ImageServiceClient(); imgClient.GetAllImageCompleted += new EventHandler<GetAllImageCompletedEventArgs>(imgClient_GetAllImageCompleted); imgClient.GetAllImageAsync();
void imgClient_GetAllImageCompleted(object sender, GetAllImageCompletedEventArgs e) { for (int i = 0; i < e.Result.Length; i++) { this.UserLogos.Add(e.Result[i]); } NotifyOfPropertyChange(() => UserLogos); }
I am not going to explain the downside of these approaches mentioned above, Just like me you all might have experienced it. Lets rewrite the code using Reactive Extension (Rx).
Proxy less approach
private void ReadAllImages() { IImageServiceAsync imgService = ServiceChannelProvider.CreateChannel<IImageServiceAsync>(); var func = Observable.FromAsyncPattern<IList<UserImage>>(imgService.BeginGetAllImage, imgService.EndGetAllImage) .Invoke() .SubscribeOnDispatcher() .Subscribe(s => this.UserLogos = s.ToList<UserImage>()); }
Rx have one method called FromAsyncPattern, there we can provide our Beginxxx and Endxxx functions. I provided BeginGetAllImages and EndGetAllImages to FromAsyncPattern function. I also provided the return type of EndGetAllImages() to FromAsyncPattern function. Return type of EndGetAllImages is IList<UserImage>, so I called FromAsyncPattern as FromAsyncPattern<IList<UserImage>>. Rx uses Observer pattern to publish the result. So here I added UserLogos properties as my observer, once the execution is done the result will be pushed to the observer. Here the observer is a property in my view model. Below is the UserLogos property
List<UserImage> _logos = new List<UserImage>(); public List<UserImage> UserLogos { get { return _logos; } private set { lock (this) { _logos = value; NotifyOfPropertyChange(() => UserLogos); } } }
Let’s see how we can make an async call in proxy generated approach.
private void ReadAllImages() { ImageServiceClient imgClient = new ImageServiceClient(); var o = Observable.FromEvent<GetAllImageCompletedEventArgs>(imgClient, "GetAllImageCompleted") .ObserveOn(Scheduler.ThreadPool) .Select(result => result.EventArgs.Result) .Subscribe(s => this.UserLogos = s.ToList<UserImage>()); imgClient.GetAllImageAsync(); }
Here I used FromEvent function provided by Rx instead of FromAsyncPattern. FromEvent accepts the event type, here it is GetAllImageCompletedEventArgs. It also accepts the service client object and which event it should handle. I passed the GetAllImageCompleted to the FromEvent function. Then we need to attach UserLogos as observer. After that we called GetAllImageAsync of service proxy.
You can see our code is pretty clean, we don’t have any messy code that we normally write to handle the async calls. Once you start using Rx I think you will never go back to the old approach of placing async calls.
Twitter Public Timeline reader for WP7 using Caliburn Micro
This is my first app for Windows Phone 7. This application doesn’t have much functionality, it just make a call to Public Timeline rest api to get the tweets. I choose PublicTimeline as it doesn’t requires any Twitter authentication. John Papa have a blog about communicating with Twitter, I used his blog to gather the details to communicate with Twitter. As usual I used my favorite MVVM frame work Caliburn Micro here as well. To kick start with WP7 development using caliburn micro I suggest to download the template from shazaml.com. The template has the CM’s WP7 bootstrapping technique mentioned by Rob Eisenberg in his blog.
Screen shot of my app.
In this app I used Microsoft Reactive Extension (Rx) libraries to make asynchronous call to Twitter. I combined both the Rx and Caliburn micro’s EventAggregator to do Async call and publish the data. Rx works in publisher/subscriber model, Initially I wrote an observer by my self. After I thought why should I reinvent the wheel, as Caliburn Micro has a powerful publisher/subscriber module. So I removed my observer class with EventAggregator shipped with CM. I used Rx here to avoid all the messy code that we need to write to make an async call. Jerome Laban have a blog about Rx library and is a pretty good article to start off with Rx library.
Another functionality I wanted to add to the app was caching facility, that means the app should be able to cache older tweets. Instead of fetching from Twitter we can get it from cache, if user wants to see older tweets. I was thinking of serializing my Entities to IsolatedStorage and do all the hard work by myself. Before implementing the serialization functionality I did a google search and come accross this codeplex project called winPhone7db. It did a decent job of serializing my entities to Isolated storage. As the name states its gives a feel like we are dealing with db. It encapsulates all the hurdles of communicating with Isolated storage and serialization.
Let’s jump into the details of the app. Below code explains my MainPage view model, it’s not very complicated just like any other basic view model
public class MainPageViewModel:Screen,IHandle<TimelineMessage> { readonly INavigationService navigationService; public MainPageViewModel(INavigationService navigationService) { this.navigationService = navigationService; EventAggregatorHelper.EventAggregator.Subscribe(this); this.Tweets = new ObservableCollection<TweetViewModel>(); } public ObservableCollection<TweetViewModel> Tweets { get; private set; } protected override void OnViewLoaded(object view) { base.OnViewLoaded(view); TwitterCommunicator twitterCommunicator = new TwitterCommunicator(); twitterCommunicator.GetPublicTimelines("sonyarouje");
} public void LoadMore() { TwitterCommunicator twitterCommunicator = new TwitterCommunicator(); this.CreateUI(twitterCommunicator.LoadFromCache()); } private void CreateUI(List<Tweet> tweets) { foreach (Tweet tweet in tweets) { TweetViewModel tweetViewModel = new TweetViewModel(tweet); this.Tweets.Add(tweetViewModel); } } public void Handle(TimelineMessage message) { if (message.Tweets != null) { this.CreateUI(message.Tweets); TwitterCommunicator twitterCommunicator = new TwitterCommunicator(); twitterCommunicator.SaveToCache(message.Tweets); } } }
I inherited the view model from Screen so that I can make use of OnViewLoaded method. The async call to Twitter orginates from onViewLoaded method. Also you can see that this viewmodel is subscribed to TimelineMessage. This is to get the notification once the Reactive Extension completes it’s async call to Twitter using webclient.
TweetViewModel is another view model to display individual tweets. All the individual TweetViewModel get added to an observable collection called Tweets and caliburn micro internally bind the respective view to an ItemsControl in my MainPageView.
Now let’s go through the Twitter communicator
public class TwitterCommunicator { private readonly string FriendTimeLine = "http://twitter.com/statuses/friends_timeline/{0}.xml?count=50"; private readonly string PublicTimeLine = "http://api.twitter.com/1/statuses/public_timeline.xml?screen_name={0}"; public void GetPublicTimelines(string userName) { string uriString = string.Format(PublicTimeLine, userName); Uri uri = new Uri(uriString); WebClient wc = new System.Net.WebClient(); var o = Observable.FromEvent<DownloadStringCompletedEventArgs>(wc, "DownloadStringCompleted") .ObserveOn(Scheduler.ThreadPool) .Select(newString => newString.EventArgs.Result); o.ObserveOn(Scheduler.Dispatcher).Subscribe(s => EventAggregatorHelper.EventAggregator.Publish<TimelineMessage>(new TimelineMessage(LoadPublicTimeLines(s.ToString())))); wc.DownloadStringAsync(uri); } public List<Tweet> LoadFromCache() { Repository repository = new Repository(); return repository.GetFromCache<Tweet>(); } public void SaveToCache(List<Tweet> tweets) { Repository repository = new Repository(); repository.Add<Tweet>(tweets); repository.SaveChanges(); } private List<Tweet> LoadPublicTimeLines(string statuses) { try { XElement xmlElement = XElement.Parse(statuses); ; XNamespace ns = ""; var twitterQuery = from msg in xmlElement.Descendants(ns + "status") let sender = msg.Element(ns + "user") select new Tweet { TwitterId = (msg.Element(ns + "id").Value).ToLong(), CreatedAt = (msg.Element(ns + "created_at").Value).ToDateTime(), Text = msg.Element(ns + "text").Value, User = new User { UserId = (sender.Element(ns + "id").Value).ToLong(), Name = sender.Element(ns + "name").Value, ScreenName = sender.Element(ns + "screen_name").Value, Description = sender.Element(ns + "description").Value, Location = sender.Element(ns + "location").Value, ProfileImageUrl = sender.Element(ns + "profile_image_url").Value, Url = sender.Element(ns + "url").Value, Protected = (sender.Element(ns + "protected").Value).ToBool(), FollowersCount = (sender.Element(ns + "followers_count").Value).ToLong() } }; List<Tweet> statusList = twitterQuery.ToList<Tweet>(); return statusList; } catch (Exception ex) { return null; } }
You need to add Microsoft.Phone.Reactive assembly to avail the functionality of Rx. Also I installed the Rx setup for Silverlight 4 and added System.Observable reference shipped with the setup.
The function GetPublicTimeLines place the async call to Twitter. As I said before I use Rx to handle the async call and notify the subscriber once it’s done. I use the WebClient class to communicate with Twitter. I choose WebClient as it is very easy to use shipped with WP7, also am not doing any complex calls to twitter that requires authentication. The below code will get executed once the async call is completed.
o.ObserveOn(Scheduler.Dispatcher).Subscribe(s => EventAggregatorHelper.EventAggregator.Publish<TimelineMessage>(new TimelineMessage(LoadPublicTimeLines(s.ToString()))));
The above code does publication of the result. Before publishing I processed the result we got from twitter and converted the xml message to Tweet entity. Conversion is done by LoadPublicTimeLine method. Once the processing is done I published the message using EventAggregator.
Now Let’s see the caching part. I created a repository class to communicate with SilverlightPhoneDatabase. Below is the class I created for it.
using System; using System.Linq; using System.Collections.Generic; using SilverlightPhoneDatabase; namespace TwitterClientAPI { public class Repository { private const string DATABASENAME = "TweetsCache"; Database _db; private Database TweetDataBase { get { if (_db == null) { if (Database.DoesDatabaseExists(DATABASENAME) == false) { _db = Database.CreateDatabase(DATABASENAME); _db.Save(); } else { _db = Database.OpenDatabase(DATABASENAME); } } return _db; } } private Table<T> GetTable<T>() where T : class { Database db = this.TweetDataBase; if (db.Table<T>() == null) { db.CreateTable<T>(); } return db.Table<T>(); } public void Add<T>(List<T> entities) where T : class { Table<T> table = this.GetTable<T>(); foreach (T entity in entities) { table.Add(entity); } } public void Add<T>(T entity) where T : class { Table<T> table = this.GetTable<T>(); table.Add(entity); } public void SaveChanges() { this.TweetDataBase.BeginSave((s)=> { if (s.Error == null) { //save unsuccessful, take necessary action } }); } public List<T> GetFromCache<T>() where T:class { try { var query = (from tCache in this.TweetDataBase.Table<T>() select tCache); List<T> cachedData = query.ToList<T>(); return cachedData; } catch (Exception ex) { return null; } } } }
I used the same generic repository pattern explained in one of my previous post. When I wrote this app I spend most of the time in investigating about different methods of implementing Async call and caching of tweets. Developing an app for WP7 is pretty simple I spent less than 2 hrs to finish the app. Apart from the development, I spent some time to learn Rx. Through this app I got some insight of WP7 development, Reactive Extension, SilverlightPhoneDatabase and more attached to Caliburn Micro :).
If you really want to do some decent job using Twitter API then I suggest Hammock. It’s a REST library to consume REST Services. The library also supports OAuth authentication and can be used to connect to Twitter. Sudheer has blog series explaining about authentication with Twitter using Hammock. Also Scott Gu has a very good blog about the overview of WP7 development.
To run the source you need to have WP7 Development tools. You can download the tools from here.
Download Source code.