Morgan Cheng Morgan Cheng - 3 months ago 27
C# Question

How to write Asynchronous LINQ query?

After I read a bunch of LINQ related stuff, I suddenly realized that no articles introduce how to write asynchronous LINQ query.

Suppose we use LINQ to SQL, below statement is clear. However, if the SQL database responds slowly, then the thread using this block of code would be hindered.

var result = from item in Products where item.Price > 3 select item.Name;
foreach (var name in result)
{
Console.WriteLine(name);
}


Seems that current LINQ query spec doesn't provide support to this.

Is there any way to do asynchronous programming LINQ? It works like there is a callback
notification when results are ready to use without any blocking delay on I/O.

Answer

While LINQ doesn't really have this per se, the framework itself does... You can easily roll your own asynchronous query executor in 30 lines or so... In fact, I just threw this together for you :)

EDIT: Through writing this, I've discovered why they didn't implement it. It cannot handle anonymous types since they are scoped local. Thus, you have no way of defining your callback function. This is a pretty major thing since a lot of linq to sql stuff creates them in the select clause. Any of the below suggestions suffer the same fate, so I still think this one is the easiest to use!

EDIT: The only solution is to not use anonymous types. You can declare the callback as just taking IEnumerable (no type args), and use reflection to access the fields (ICK!!). Another way would be to declare the callback as "dynamic"... oh... wait... That's not out yet. :) This is another decent example of how dynamic could be used. Some may call it abuse.

Throw this in your utilities library:

public static class AsynchronousQueryExecutor
{
    public static void Call<T>(IEnumerable<T> query, Action<IEnumerable<T>> callback, Action<Exception> errorCallback)
    {
        Func<IEnumerable<T>, IEnumerable<T>> func =
            new Func<IEnumerable<T>, IEnumerable<T>>(InnerEnumerate<T>);
        IEnumerable<T> result = null;
        IAsyncResult ar = func.BeginInvoke(
                            query,
                            new AsyncCallback(delegate(IAsyncResult arr)
                            {
                                try
                                {
                                    result = ((Func<IEnumerable<T>, IEnumerable<T>>)((AsyncResult)arr).AsyncDelegate).EndInvoke(arr);
                                }
                                catch (Exception ex)
                                {
                                    if (errorCallback != null)
                                    {
                                        errorCallback(ex);
                                    }
                                    return;
                                }
                                //errors from inside here are the callbacks problem
                                //I think it would be confusing to report them
                                callback(result);
                            }),
                            null);
    }
    private static IEnumerable<T> InnerEnumerate<T>(IEnumerable<T> query)
    {
        foreach (var item in query) //the method hangs here while the query executes
        {
            yield return item;
        }
    }
}

And you could use it like this:

class Program
{

    public static void Main(string[] args)
    {
        //this could be your linq query
        var qry = TestSlowLoadingEnumerable();

        //We begin the call and give it our callback delegate
        //and a delegate to an error handler
        AsynchronousQueryExecutor.Call(qry, HandleResults, HandleError);

        Console.WriteLine("Call began on seperate thread, execution continued");
        Console.ReadLine();
    }

    public static void HandleResults(IEnumerable<int> results)
    {
        //the results are available in here
        foreach (var item in results)
        {
            Console.WriteLine(item);
        }
    }

    public static void HandleError(Exception ex)
    {
        Console.WriteLine("error");
    }

    //just a sample lazy loading enumerable
    public static IEnumerable<int> TestSlowLoadingEnumerable()
    {
        Thread.Sleep(5000);
        foreach (var i in new int[] { 1, 2, 3, 4, 5, 6 })
        {
            yield return i;
        }
    }

}

Going to go put this up on my blog now, pretty handy.

Comments