Guerrilla Guerrilla - 2 months ago 25
C# Question

How to handle threads that hang when using SemaphoreSlim

I have some code that runs thousands of URLs through a third party library. Occasionally the method in the library hangs which takes up a thread. After a while all threads are taken up by processes doing nothing and it grinds to a halt.

I am using a

SemaphoreSlim
to control adding new threads so I can have an optimal number of tasks running. I need a way to identify tasks that have been running too long and then to kill them but also release a thread from the
SemaphoreSlim
so a new task can be created.

I am struggling with the approach here so I made some test code that immitates what I am doing. It create tasks that have a 10% chance of hanging so very quickly all threads have hung.

How should I be checking for these and killing them off?

Here is the code:

class Program
{
public static SemaphoreSlim semaphore;
public static List<Task> taskList;
static void Main(string[] args)
{

List<string> urlList = new List<string>();
Console.WriteLine("Generating list");
for (int i = 0; i < 1000; i++)
{
//adding random strings to simulate a large list of URLs to process
urlList.Add(Path.GetRandomFileName());
}
Console.WriteLine("Queueing tasks");

semaphore = new SemaphoreSlim(10, 10);

Task.Run(() => QueueTasks(urlList));

Console.ReadLine();
}
static void QueueTasks(List<string> urlList)
{
taskList = new List<Task>();

foreach (var url in urlList)
{
Console.WriteLine("{0} tasks can enter the semaphore.",
semaphore.CurrentCount);
semaphore.Wait();

taskList.Add(DoTheThing(url));
}
}
static async Task DoTheThing(string url)
{

Random rand = new Random();

// simulate the IO process
await Task.Delay(rand.Next(2000, 10000));

// add a 10% chance that the thread will hang simulating what happens occasionally with http request
int chance = rand.Next(1, 100);
if (chance <= 10)
{
while (true)
{
await Task.Delay(1000000);
}
}

semaphore.Release();
Console.WriteLine(url);
}
}

Answer

As people have already pointed out, Aborting threads in general is bad and there is no guaranteed way of doing it in C#. Using a separate process to do the work and then kill it is a slightly better idea than attempting Thread.Abort; but still not the best way to go. Ideally, you want co-operative threads/processes, which use IPC to decide when to bail out themselves. This way the cleanup is done properly.

With all that said, you can use code like below to do what you intend to do. I have written it assuming your task will be done in a thread. With slight changes, you can use the same logic to do your task in a process

The code is by no means bullet-proof and is meant to be illustrative. The concurrent code is not really tested well. Locks are held for longer than needed and some places I am not locking (like the Log function)

class TaskInfo {
    public Thread Task;
    public DateTime StartTime;

    public TaskInfo(ParameterizedThreadStart startInfo, object startArg) {
        Task = new Thread(startInfo);
        Task.Start(startArg);
        StartTime = DateTime.Now;
    }

}

class Program {

    const int MAX_THREADS = 1;
    const int TASK_TIMEOUT = 6; // in seconds
    const int CLEANUP_INTERVAL = TASK_TIMEOUT; // in seconds

    public static SemaphoreSlim semaphore;

    public static List<TaskInfo> TaskList;
    public static object TaskListLock = new object();

    public static Timer CleanupTimer;

    static void Main(string[] args) {
        List<string> urlList = new List<string>();
        Log("Generating list");
        for (int i = 0; i < 2; i++) {
            //adding random strings to simulate a large list of URLs to process
            urlList.Add(Path.GetRandomFileName());
        }
        Log("Queueing tasks");

        semaphore = new SemaphoreSlim(MAX_THREADS, MAX_THREADS);

        Task.Run(() => QueueTasks(urlList));

        CleanupTimer = new Timer(CleanupTasks, null, CLEANUP_INTERVAL * 1000, CLEANUP_INTERVAL * 1000);


        Console.ReadLine();
    }

    // TODO: Guard against re-entrancy
    static void CleanupTasks(object state) {
        Log("CleanupTasks started");

        lock (TaskListLock) {
            var now = DateTime.Now;
            int n = TaskList.Count;
            for (int i = n - 1; i >= 0; --i) {
                var task = TaskList[i];
                Log($"Checking task with ID {task.Task.ManagedThreadId}");

                // kill processes running for longer than anticipated
                if (task.Task.IsAlive && now.Subtract(task.StartTime).TotalSeconds >= TASK_TIMEOUT) {
                    Log("Cleaning up hung task");
                    task.Task.Abort();
                }

                // remove task if it is not alive
                if (!task.Task.IsAlive) {
                    Log("Removing dead task from list");
                    TaskList.RemoveAt(i);
                    continue;
                }

            }

            if (TaskList.Count == 0) {
                Log("Disposing cleanup thread");
                CleanupTimer.Dispose();
            }
        }

        Log("CleanupTasks done");
    }

    static void QueueTasks(List<string> urlList) {
        TaskList = new List<TaskInfo>();

        foreach (var url in urlList) {
            Log($"Trying to schedule url = {url}");
            semaphore.Wait();
            Log("Semaphore acquired");

            ParameterizedThreadStart taskRoutine = obj => {
                try {
                    DoTheThing((string)obj);
                } finally {
                    Log("Releasing semaphore");
                    semaphore.Release();
                }
            };

            var task = new TaskInfo(taskRoutine, url);
            lock (TaskListLock)
                TaskList.Add(task);
        }

        Log("All tasks queued");
    }

    // simulate all processes get hung
    static void DoTheThing(string url) {
        while (true)
            Thread.Sleep(5000);
    }

    static void Log(string msg) {
        Console.WriteLine("{0:HH:mm:ss.fff} Thread {1,2} {2}", DateTime.Now, Thread.CurrentThread.ManagedThreadId.ToString(), msg);
    }
}