Parallel.ForEach Async in C#
As mentioned in my previous post, to get a ‘proper’ parallel foreach that is async is a bit of a pain
So the solution is to write a true async function
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
public static async Task ForEach<T>(ICollection<T> source, Func<T, Task> body, CancellationToken token ) { // create the list of tasks we will be running var tasks = new List<Task>(source.Count); try { // and add them all at once. tasks.AddRange(source.Select(s => Task.Run(() => body(s), token))); // execute it all with a delay to throw. for (; ; ) { // very short delay var delay = Task.Delay(1, token ); // and all our tasks await Task.WhenAny( Task.WhenAll(tasks), delay).ConfigureAwait(false); if (tasks.All(t => t.IsCompleted)) { break; } // // ... use a spinner or something } await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false); // throw if we are done here. token.ThrowIfCancellationRequested(); } finally { // find the error(s) that might have happened. var errors = tasks.Where(tt => tt.IsFaulted).Select(tu => tu.Exception).ToList(); // we are back in our own thread if (errors.Count > 0) { throw new AggregateException(errors); } } } |
And you can call it …
1 2 3 4 5 6 7 |
await ParallelAsync.ForEach(number, async (numbers) => { // blah ... // blah .... await DoSomethingAmazing( number ).ConfigureAwait(false); }, CancellationToken.None).ConfigureAwait( false ); |
Of course, you can refine it by adding check for tokens that cannot be cancelled as well as empty sources
First prize you must make sure that the body
of the ForEach
takes in the token and cancels cleanly otherwise this will jump out with thread left up in the air… but at least it will get out.
Edit: As someone pointed out to me on StackOverflow there are a couple of subtle ways I can improve my implementation … so I added them here
Recent Comments