diff options
| author | Jonathan Bradley <jcb@pikum.xyz> | 2025-01-09 14:44:31 -0500 |
|---|---|---|
| committer | Jonathan Bradley <jcb@pikum.xyz> | 2025-01-09 14:44:31 -0500 |
| commit | b76e309166f714b0a66fb4802f02e92a82d09082 (patch) | |
| tree | 44244fc0e6f873e1ccf6e1e95e2fec62fcec394a /src/thread_pool.cpp | |
| parent | b04fefe8ee0086bc1404c06b8351ecb4e942f151 (diff) | |
flatten file structure + rename
Diffstat (limited to 'src/thread_pool.cpp')
| -rw-r--r-- | src/thread_pool.cpp | 217 |
1 files changed, 0 insertions, 217 deletions
diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp deleted file mode 100644 index e2ef3cd..0000000 --- a/src/thread_pool.cpp +++ /dev/null @@ -1,217 +0,0 @@ - -#include "thread_pool.hpp" - -#include "bucketed-array.hpp" - -#include <functional> -#include <future> - -struct ThreadPool { - bool isRunning; - bool isPaused; - uint8_t maxJobQueueCount; - std::mutex mutex; - std::atomic<uint64_t> completedCount; - std::condition_variable condition; - DynArray<std::packaged_task<void()> *> *jobQueue; - DynArray<std::thread> *threads; - struct pk_membucket *bkt = nullptr; -}; - -const pk_handle_item_index_T MAX_THREADS_PER_BUCKET = 8; - -BucketContainer<ThreadPool, ThreadPoolHandle> ThreadPool_BucketContainer{}; - -void ThreadRun(ThreadPool *tp) { - std::packaged_task<void()> *j = nullptr; - while (tp->isRunning && !tp->isPaused) { - { - std::unique_lock<std::mutex> lck(tp->mutex); - tp->condition.wait(lck, [tp] { - if (!tp->isRunning) return true; - if (tp->isPaused) return true; - if (tp->jobQueue == nullptr) return true; - if (tp->jobQueue == CAFE_BABE(DynArray<std::packaged_task<void()> *>)) return true; - return tp->jobQueue->Count() != 0; - }); - if (!tp->isRunning || tp->isPaused || tp->jobQueue == nullptr || tp->jobQueue == CAFE_BABE(DynArray<std::packaged_task<void()> *>)) { - return; - } - if (tp->jobQueue->Count() == 0) { - continue; - } - j = (*tp->jobQueue)[0]; - tp->jobQueue->Remove(0, 1); - } - assert(j != nullptr); - (*j)(); - pk_delete<std::packaged_task<void()>>(j, tp->bkt); - tp->completedCount = tp->completedCount + 1; - } -} - -void inline PkeThreads_JoinAll_Inner(ThreadPool &tp) { - tp.condition.notify_all(); - long count = tp.threads->Count(); - for (long l = 0; l < count; ++l) { - auto &t = (*tp.threads)[l]; - if (t.joinable()) { - t.join(); - } - } -} - -void inline PkeThreads_DetatchAll_Inner(ThreadPool &tp) { - long count = tp.threads->Count(); - for (long i = 0; i < count; ++i) { - auto &t = (*tp.threads)[i]; - t.detach(); - } - tp.condition.notify_all(); -} - -void inline PkeThreads_Reset_Inner(ThreadPool &tp) { - tp.mutex.lock(); - tp.maxJobQueueCount = 0; - tp.completedCount = 0; - tp.jobQueue->Resize(0); - tp.threads->Resize(0); - tp.mutex.unlock(); -} - -bool inline PkeThreads_Enqueue_Inner(ThreadPool &tp, std::packaged_task<void()> *job) { - tp.mutex.lock(); - if (tp.isRunning == true) { - if (tp.jobQueue->Count() < tp.maxJobQueueCount) { - tp.jobQueue->Push(job); - tp.condition.notify_one(); - tp.mutex.unlock(); - return true; - } - } - tp.mutex.unlock(); - return false; -} - -void inline PkeThreads_Pause_Inner(ThreadPool &tp) { - tp.mutex.lock(); - if (tp.isPaused == true) { - return; // called more than once - } - tp.isPaused = true; - tp.mutex.unlock(); - PkeThreads_JoinAll_Inner(tp); -} - -void inline PkeThreads_Resume_Inner(ThreadPool &tp) { - tp.mutex.lock(); - tp.isPaused = false; - long count = tp.threads->Count(); - for (size_t i = 0; i < count; i++) { - (*tp.threads)[i] = std::thread(std::bind(ThreadRun, &tp)); - } - tp.mutex.unlock(); -} - -void inline PkeThreads_Shutdown_Inner(ThreadPool &tp) { - tp.mutex.lock(); - if (tp.isRunning == false) { - return; - } - tp.isRunning = false; - tp.isPaused = false; - tp.jobQueue->Resize(0); - tp.mutex.unlock(); -} - -void PkeThreads_Init() { - Buckets_Init(ThreadPool_BucketContainer, MAX_THREADS_PER_BUCKET); -} - -ThreadPoolHandle PkeThreads_Init(uint8_t threadCount, uint8_t maxQueueCount, struct pk_membucket *bkt) { - assert(threadCount > 0); - ThreadPoolHandle newHandle{Buckets_NewHandle(ThreadPool_BucketContainer)}; - - auto *tp = &ThreadPool_BucketContainer.buckets[newHandle.bucketIndex][newHandle.itemIndex]; - - tp->bkt = bkt; - tp->isRunning = true; - tp->isPaused = false; - tp->maxJobQueueCount = maxQueueCount; - tp->completedCount = 0; - tp->jobQueue = pk_new<DynArray<std::packaged_task<void()> *>>(bkt); - tp->threads = pk_new<DynArray<std::thread>>(bkt); - - tp->threads->Resize(threadCount); - for (long l = 0; l < threadCount; ++l) { - (*tp->threads)[l] = std::thread(std::bind(ThreadRun, tp)); - } - - return newHandle; -} - -void PkeThreads_Reset(ThreadPoolHandle handle) { - assert(handle != ThreadPoolHandle_MAX); - auto *tp = &ThreadPool_BucketContainer.buckets[handle.bucketIndex][handle.itemIndex]; - PkeThreads_Reset_Inner(*tp); -} - -bool PkeThreads_Enqueue(ThreadPoolHandle handle, std::packaged_task<void()> *job) { - assert(handle != ThreadPoolHandle_MAX); - auto *tp = &ThreadPool_BucketContainer.buckets[handle.bucketIndex][handle.itemIndex]; - if (tp->bkt != nullptr) { - /* 2023-12-22 JCB - * Note that if this becomes an issue we can change it. - * Technically speaking, if we call the right pk_delete - * we don't even need to worry about passing the struct pk_membucket - */ - assert(pk_memory_is_in_bucket(job, tp->bkt) == true && "cannot enqueue packaged task from a non-matching struct pk_membucket"); - } - - return PkeThreads_Enqueue_Inner(*tp, job); -} - -int64_t PkeThreads_GetQueueCount (ThreadPoolHandle handle) { - auto &threadPool = ThreadPool_BucketContainer.buckets[handle.bucketIndex][handle.itemIndex]; - return threadPool.jobQueue->Count(); -} - -void PkeThreads_Pause(ThreadPoolHandle handle) { - assert(handle != ThreadPoolHandle_MAX); - auto *tp = &ThreadPool_BucketContainer.buckets[handle.bucketIndex][handle.itemIndex]; - - PkeThreads_Pause_Inner(*tp); -} - -void PkeThreads_Resume(ThreadPoolHandle handle) { - assert(handle != ThreadPoolHandle_MAX); - auto *tp = &ThreadPool_BucketContainer.buckets[handle.bucketIndex][handle.itemIndex]; - - PkeThreads_Resume_Inner(*tp); -} - -void PkeThreads_Shutdown(ThreadPoolHandle handle) { - assert(handle != ThreadPoolHandle_MAX); - auto *tp = &ThreadPool_BucketContainer.buckets[handle.bucketIndex][handle.itemIndex]; - - PkeThreads_Shutdown_Inner(*tp); - PkeThreads_JoinAll_Inner(*tp); -} - -void PkeThreads_Teardown(ThreadPoolHandle handle) { - assert(handle != ThreadPoolHandle_MAX); - auto *tp = &ThreadPool_BucketContainer.buckets[handle.bucketIndex][handle.itemIndex]; - - PkeThreads_Shutdown_Inner(*tp); - PkeThreads_JoinAll_Inner(*tp); - PkeThreads_Reset_Inner(*tp); - pk_delete<DynArray<std::packaged_task<void()> *>>(tp->jobQueue, tp->bkt); - pk_delete<DynArray<std::thread>>(tp->threads, tp->bkt); - tp->jobQueue = CAFE_BABE(DynArray<std::packaged_task<void()> *>); - tp->threads = CAFE_BABE(DynArray<std::thread>); - tp->bkt = CAFE_BABE(struct pk_membucket); -} - -void PkeThreads_Teardown() { - Buckets_Destroy(ThreadPool_BucketContainer); -} |
