#include "thread-pool.hpp" #include "bucketed-array.hpp" #include #include struct ThreadPool { bool isRunning; bool isPaused; uint8_t maxJobQueueCount; std::mutex mutex; std::atomic completedCount; std::condition_variable condition; DynArray *> *jobQueue; DynArray *threads; struct pk_membucket *bkt = nullptr; }; const pk_handle_item_index_T MAX_THREADS_PER_BUCKET = 8; BucketContainer ThreadPool_BucketContainer{}; void ThreadRun(ThreadPool *tp) { std::packaged_task *j = nullptr; while (tp->isRunning && !tp->isPaused) { { std::unique_lock lck(tp->mutex); tp->condition.wait(lck, [tp] { if (!tp->isRunning) return true; if (tp->isPaused) return true; if (tp->jobQueue == nullptr) return true; if (tp->jobQueue == CAFE_BABE(DynArray *>)) return true; return tp->jobQueue->Count() != 0; }); if (!tp->isRunning || tp->isPaused || tp->jobQueue == nullptr || tp->jobQueue == CAFE_BABE(DynArray *>)) { return; } if (tp->jobQueue->Count() == 0) { continue; } j = (*tp->jobQueue)[0]; tp->jobQueue->Remove(0, 1); } assert(j != nullptr); (*j)(); pk_delete>(j, tp->bkt); tp->completedCount = tp->completedCount + 1; } } void inline PkeThreads_JoinAll_Inner(ThreadPool &tp) { tp.condition.notify_all(); long count = tp.threads->Count(); for (long l = 0; l < count; ++l) { auto &t = (*tp.threads)[l]; if (t.joinable()) { t.join(); } } } void inline PkeThreads_DetatchAll_Inner(ThreadPool &tp) { long count = tp.threads->Count(); for (long i = 0; i < count; ++i) { auto &t = (*tp.threads)[i]; t.detach(); } tp.condition.notify_all(); } void inline PkeThreads_Reset_Inner(ThreadPool &tp) { tp.mutex.lock(); tp.maxJobQueueCount = 0; tp.completedCount = 0; tp.jobQueue->Resize(0); tp.threads->Resize(0); tp.mutex.unlock(); } bool inline PkeThreads_Enqueue_Inner(ThreadPool &tp, std::packaged_task *job) { tp.mutex.lock(); if (tp.isRunning == true) { if (tp.jobQueue->Count() < tp.maxJobQueueCount) { tp.jobQueue->Push(job); tp.condition.notify_one(); tp.mutex.unlock(); return true; } } tp.mutex.unlock(); return false; } void inline PkeThreads_Pause_Inner(ThreadPool &tp) { tp.mutex.lock(); if (tp.isPaused == true) { return; // called more than once } tp.isPaused = true; tp.mutex.unlock(); PkeThreads_JoinAll_Inner(tp); } void inline PkeThreads_Resume_Inner(ThreadPool &tp) { tp.mutex.lock(); tp.isPaused = false; long count = tp.threads->Count(); for (size_t i = 0; i < count; i++) { (*tp.threads)[i] = std::thread(std::bind(ThreadRun, &tp)); } tp.mutex.unlock(); } void inline PkeThreads_Shutdown_Inner(ThreadPool &tp) { tp.mutex.lock(); if (tp.isRunning == false) { return; } tp.isRunning = false; tp.isPaused = false; tp.jobQueue->Resize(0); tp.mutex.unlock(); } void PkeThreads_Init() { Buckets_Init(ThreadPool_BucketContainer, MAX_THREADS_PER_BUCKET); } ThreadPoolHandle PkeThreads_Init(uint8_t threadCount, uint8_t maxQueueCount, struct pk_membucket *bkt) { assert(threadCount > 0); ThreadPoolHandle newHandle{Buckets_NewHandle(ThreadPool_BucketContainer)}; auto *tp = &ThreadPool_BucketContainer.buckets[newHandle.bucketIndex][newHandle.itemIndex]; tp->bkt = bkt; tp->isRunning = true; tp->isPaused = false; tp->maxJobQueueCount = maxQueueCount; tp->completedCount = 0; tp->jobQueue = pk_new *>>(bkt); tp->threads = pk_new>(bkt); tp->threads->Resize(threadCount); for (long l = 0; l < threadCount; ++l) { (*tp->threads)[l] = std::thread(std::bind(ThreadRun, tp)); } return newHandle; } void PkeThreads_Reset(ThreadPoolHandle handle) { assert(handle != ThreadPoolHandle_MAX); auto *tp = &ThreadPool_BucketContainer.buckets[handle.bucketIndex][handle.itemIndex]; PkeThreads_Reset_Inner(*tp); } bool PkeThreads_Enqueue(ThreadPoolHandle handle, std::packaged_task *job) { assert(handle != ThreadPoolHandle_MAX); auto *tp = &ThreadPool_BucketContainer.buckets[handle.bucketIndex][handle.itemIndex]; if (tp->bkt != nullptr) { /* 2023-12-22 JCB * Note that if this becomes an issue we can change it. * Technically speaking, if we call the right pk_delete * we don't even need to worry about passing the struct pk_membucket */ assert(pk_memory_is_in_bucket(job, tp->bkt) == true && "cannot enqueue packaged task from a non-matching struct pk_membucket"); } return PkeThreads_Enqueue_Inner(*tp, job); } int64_t PkeThreads_GetQueueCount (ThreadPoolHandle handle) { auto &threadPool = ThreadPool_BucketContainer.buckets[handle.bucketIndex][handle.itemIndex]; return threadPool.jobQueue->Count(); } void PkeThreads_Pause(ThreadPoolHandle handle) { assert(handle != ThreadPoolHandle_MAX); auto *tp = &ThreadPool_BucketContainer.buckets[handle.bucketIndex][handle.itemIndex]; PkeThreads_Pause_Inner(*tp); } void PkeThreads_Resume(ThreadPoolHandle handle) { assert(handle != ThreadPoolHandle_MAX); auto *tp = &ThreadPool_BucketContainer.buckets[handle.bucketIndex][handle.itemIndex]; PkeThreads_Resume_Inner(*tp); } void PkeThreads_Shutdown(ThreadPoolHandle handle) { assert(handle != ThreadPoolHandle_MAX); auto *tp = &ThreadPool_BucketContainer.buckets[handle.bucketIndex][handle.itemIndex]; PkeThreads_Shutdown_Inner(*tp); PkeThreads_JoinAll_Inner(*tp); } void PkeThreads_Teardown(ThreadPoolHandle handle) { assert(handle != ThreadPoolHandle_MAX); auto *tp = &ThreadPool_BucketContainer.buckets[handle.bucketIndex][handle.itemIndex]; PkeThreads_Shutdown_Inner(*tp); PkeThreads_JoinAll_Inner(*tp); PkeThreads_Reset_Inner(*tp); pk_delete *>>(tp->jobQueue, tp->bkt); pk_delete>(tp->threads, tp->bkt); tp->jobQueue = CAFE_BABE(DynArray *>); tp->threads = CAFE_BABE(DynArray); tp->bkt = CAFE_BABE(struct pk_membucket); } void PkeThreads_Teardown() { Buckets_Destroy(ThreadPool_BucketContainer); }