Ori Refael Ori Refael - 3 months ago 51
C# Question

C# async within an action

I would like to write a method which accept several parameters, including an action and a retry amount and invoke it.

So I have this code:

public static IEnumerable<Task> RunWithRetries<T>(List<T> source, int threads, Func<T, Task<bool>> action, int retries, string method)
{
object lockObj = new object();
int index = 0;

return new Action(async () =>
{
while (true)
{
T item;
lock (lockObj)
{
if (index < source.Count)
{
item = source[index];
index++;
}
else
break;
}

int retry = retries;
while (retry > 0)
{
try
{
bool res = await action(item);
if (res)
retry = -1;
else
//sleep if not success..
Thread.Sleep(200);

}
catch (Exception e)
{
LoggerAgent.LogException(e, method);
}
finally
{
retry--;
}
}
}
}).RunParallel(threads);
}


RunParallel is an extention method for Action, its look like this:

public static IEnumerable<Task> RunParallel(this Action action, int amount)
{
List<Task> tasks = new List<Task>();
for (int i = 0; i < amount; i++)
{
Task task = Task.Factory.StartNew(action);
tasks.Add(task);
}
return tasks;
}


Now, the issue: The thread is just disappearing or collapsing without waiting for the action to finish.

I wrote this example code:

private static async Task ex()
{
List<int> ints = new List<int>();
for (int i = 0; i < 1000; i++)
{
ints.Add(i);
}

var tasks = RetryComponent.RunWithRetries(ints, 100, async (num) =>
{
try
{
List<string> test = await fetchSmthFromDb();
Console.WriteLine("#" + num + " " + test[0]);
return test[0] == "test";
}
catch (Exception e)
{
Console.WriteLine(e.StackTrace);
return false;
}

}, 5, "test");

await Task.WhenAll(tasks);
}


The
fetchSmthFromDb
is a simple Task> which fetches something from the db and works perfectly fine when invoked outside of this example.

Whenever the
List<string> test = await fetchSmthFromDb();
row is invoked, the thread seems to be closing and the
Console.WriteLine("#" + num + " " + test[0]);
not even being triggered, also when debugging the breakpoint never hit.

The Final Working Code

private static async Task DoWithRetries(Func<Task> action, int retryCount, string method)
{
while (true)
{
try
{
await action();
break;
}
catch (Exception e)
{
LoggerAgent.LogException(e, method);
}

if (retryCount <= 0)
break;

retryCount--;
await Task.Delay(200);
};
}

public static async Task RunWithRetries<T>(List<T> source, int threads, Func<T, Task<bool>> action, int retries, string method)
{
Func<T, Task> newAction = async (item) =>
{
await DoWithRetries(async ()=>
{
await action(item);
}, retries, method);
};
await source.ParallelForEachAsync(action, threads);
}

Answer

The problem is in this line:

return new Action(async () => ...

You start an async operation with the async lambda, but don't return a task to await on. I.e. it runs on worker threads, but you'll never find out when it's done. And your program terminates before the async operation is complete -that's why you don't see any output.

It needs to be:

return new Func<Task>(async () => ...

UPDATE

First, you need to split responsibilities of methods, so you don't mix retry policy (which should not be hardcoded to a check of a boolean result) with running tasks in parallel.

Then, as previously mentioned, you run your while (true) loop 100 times instead of doing things in parallel.

As @MachineLearning pointed out, use Task.Delay instead of Thread.Sleep.

Overall, your solution looks like this:

using System.Collections.Async;

static async Task DoWithRetries(Func<Task> action, int retryCount, string method)
{
    while (true)
    {
        try
        {
            await action();
            break;
        }
        catch (Exception e)
        {
            LoggerAgent.LogException(e, method);
        }

        if (retryCount <= 0)
            break;

        retryCount--;
        await Task.Delay(millisecondsDelay: 200);
    };
}

static async Task Example()
{
    List<int> ints = new List<int>();
    for (int i = 0; i < 1000; i++)
        ints.Add(i);

    Func<int, Task> actionOnItem =
        async item =>
        {
            await DoWithRetries(async () =>
            {
                List<string> test = await fetchSmthFromDb();
                Console.WriteLine("#" + item + "  " + test[0]);
                if (test[0] != "test")
                    throw new InvalidOperationException("unexpected result"); // will be re-tried
            },
            retryCount: 5,
            method: "test");
        };

    await ints.ParallelForEachAsync(actionOnItem, maxDegreeOfParalellism: 100);
}

You need to use the AsyncEnumerator NuGet Package in order to use the ParallelForEachAsync extension method from the System.Collections.Async namespace.