-1

When using Parallel.ForEach(), is there a way to forcefully execute Thread.Abort on a specific thread?

I know that Thread.Abort() is not recommended.

I'm running a Parallel.ForEach() on a collection of a hundreds of thousands of entities.

The loop processes data going back 30 years in some cases. We've had a few issues where a thread hangs. While we are trying to get a grasp on that, was hoping to call implement a fail safe. If the thread runs for more than x amount of time, forcefully kill the thread.

I do not want to use a cancellation token.

It would be ugly, but haven't come to another solution. Would it be possible to:

  • Have each thread open a timer. Pass in reference of Thread.CurrentThread to timer
  • If the timer elapses, and processing hasn’t completed, call Thread.Abort on that timer
  • If needed, signal event wait handle to allow next patient to process
private void ProcessEntity(ProcessParams param,
    ConcurrentDictionary<long, string> entities)
{
    var options = new ParallelOptions
    {
        MaxDegreeOfParallelism = 2
    };
    Parallel.ForEach(person, options, p =>
    {
        ProcessPerson(param, p);
    });
}

internal void ProcessPerson(ProcessParams param, KeyValuePair<long, string> p)
{
    try
    {
        //...
    }
    catch (Exception ex)
    {

    }
    param.eventWaitHandle?.WaitOne();
}

Theodor Zoulias
  • 24,585
  • 5
  • 40
  • 69
Hoppe
  • 6,152
  • 14
  • 55
  • 106
  • Why not to use [ParallelLoopState](https://docs.microsoft.com/ru-ru/dotnet/api/system.threading.tasks.parallelloopstate)? It is designed for your case – JL0PD Feb 05 '21 at 15:56
  • 1
    Can I ask why you don't want to use a cancellation token? It sounds like it's what cancellation tokens were made to do. https://stackoverflow.com/questions/22647242/timeout-for-action-in-parallel-foreach-iteration – Steve Norwood Feb 05 '21 at 15:57
  • 1
    Cancelling threads without cooperation from said thread is not recommended. You could possibly use a timeout and skip the result from the hanged thread, but this might leave a bunch of hanged threads around. I would very much recommend investigating *why* some operations are hanging? Is there some IO-operation, if so it might have a built in timeout you can use? Is it a deadlock? if so, fix it! – JonasH Feb 05 '21 at 16:00
  • @SteveNorwood if a thread gets into a deadlock state, or simply stops responding, I don't believe a cancellation token would work, correct? For a cancellation token, I believe I need to be in a working state where I can check the cancellation token and decide whether to proceed or cancel – Hoppe Feb 05 '21 at 16:22
  • @JL0PD ParallelLoopState doesn't seem to be much different than a cancellation token IMO. I still have to be in a working state where I have the ability to call Break() – Hoppe Feb 05 '21 at 16:31
  • @JonasH Yes I am investigating the root cause. This app cannot be deployed multiple times a day. I would like to have a fail safe in the meantime – Hoppe Feb 05 '21 at 16:33
  • Be aware that the [`Thread.Abort`](https://docs.microsoft.com/en-us/dotnet/api/system.threading.thread.abort) method is not supported on .NET 5 and .NET Core. The API is still here, but it throws a `PlatformNotSupportedException`. This limits your options. Aborting threads is out. Killing processes is in. – Theodor Zoulias Feb 05 '21 at 16:33
  • Great to know. I just added a tag. I'm on .net framework 4.6.1 for this app @TheodorZoulias – Hoppe Feb 05 '21 at 16:36
  • The only valid way to interrupt a task's thread, however it's created, is cooperatively. See duplicate. – Peter Duniho Feb 05 '21 at 21:53
  • 1
    Btw I have voted for the question to be reopened, because this question is about aborting threads during a `Parallel.ForEach` operation. The [suggested as duplicate](https://stackoverflow.com/questions/4783865/how-do-i-abort-cancel-tpl-tasks) is about aborting threads of delegate-based `Task`s. Which is a quite different case, and it doesn't cover the intricacies of all TPL constructs (especially the higher level ones) included in the Task Parallel Library. – Theodor Zoulias Feb 10 '21 at 04:05

1 Answers1

-1

It seems that the Parallel.ForEach method is not resilient in the face of its worker threads being aborted, and behaves inconsistently. Other times propagates an AggregateException that contains the ThreadAbortException, and other times it throws an ThreadAbortException directly, with an ugly stack trace revealing its internals.

Below is a custom ForEachTimeoutAbort method that offers the basic functionality of the Parallel.ForEach, without advanced features like cancellation, loop state, custom partitioners etc. Its special feature is the TimeSpan timeout parameter, that aborts the worker thread of any item that takes too long to complete.

public static void ForEachTimeoutAbort<TSource>(
    this IEnumerable<TSource> source,
    Action<TSource> action,
    int maxDegreeOfParallelism,
    TimeSpan timeout)
{
    // Arguments validation omitted
    var semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
    var exceptions = new ConcurrentQueue<Exception>();
    try
    {
        foreach (var item in source)
        {
            semaphore.Wait();
            if (!exceptions.IsEmpty) { semaphore.Release(); break; }

            ThreadPool.QueueUserWorkItem(_ =>
            {
                var timer = new Timer(state => ((Thread)state).Abort(),
                    Thread.CurrentThread, Timeout.Infinite, Timeout.Infinite);
                try
                {
                    timer.Change(timeout, Timeout.InfiniteTimeSpan);
                    action(item);
                }
                catch (Exception ex) { exceptions.Enqueue(ex); }
                finally
                {
                    using (var waitHandle = new ManualResetEvent(false))
                    {
                        timer.Dispose(waitHandle);
                        // Wait the timer's callback (if it's queued) to complete.
                        waitHandle.WaitOne();
                    }
                    semaphore.Release();
                }
            });
        }
    }
    catch (Exception ex) { exceptions.Enqueue(ex); }

    // Wait for all pending operations to complete
    for (int i = 0; i < maxDegreeOfParallelism; i++) semaphore.Wait();
    if (!exceptions.IsEmpty) throw new AggregateException(exceptions);
}

A peculiarity of the ThreadAbortException is that it cannot be caught. So in order to prevent the premature completion of the parallel loop, the Thread.ResetAbort method must be called from the catch block.

Usage example:

ForEachTimeoutAbort(persons, p =>
{
    try
    {
        ProcessPerson(param, p);
    }
    catch (ThreadAbortException)
    {
        Thread.ResetAbort();
    }
}, maxDegreeOfParallelism: 2, timeout: TimeSpan.FromSeconds(30));

The .NET Framework is the only platform where the ForEachTimeoutAbort method could be used. For .NET Core and .NET 5 one could try converting it to ForEachTimeoutInterrupt, and replace the call to Abort with a call to Interrupt. Interrupting a thread is not as effective as aborting it, because it only has effect when the thread is in a waiting/sleeping mode. But it may be sufficient in some scenarios.

Theodor Zoulias
  • 24,585
  • 5
  • 40
  • 69