diff options
| author | Jonathan Bradley <jcb@pikum.xyz> | 2025-05-20 21:03:57 -0400 |
|---|---|---|
| committer | Jonathan Bradley <jcb@pikum.xyz> | 2025-05-20 21:03:57 -0400 |
| commit | ead9e484db969a880470d625b1884aced296e722 (patch) | |
| tree | d36ce4ac2d0676dc22a152e8a3b26513ea8234dc /src | |
| parent | 8318fca632b8ff82c5966876c192fae891501771 (diff) | |
pke: thread-pool swap DynArray with pke_arr
Diffstat (limited to 'src')
| -rw-r--r-- | src/thread-pool.cpp | 69 | ||||
| -rw-r--r-- | src/thread-pool.hpp | 2 |
2 files changed, 35 insertions, 36 deletions
diff --git a/src/thread-pool.cpp b/src/thread-pool.cpp index b6ff97e..15c5551 100644 --- a/src/thread-pool.cpp +++ b/src/thread-pool.cpp @@ -2,7 +2,7 @@ #include "thread-pool.hpp" #include "bucketed-array.hpp" -#include "dynamic-array.hpp" +#include "pk.h" #include <functional> #include <future> @@ -14,8 +14,8 @@ struct ThreadPool { std::mutex mutex; std::atomic<uint64_t> completedCount; std::condition_variable condition; - DynArray<std::packaged_task<void()> *> *jobQueue; - DynArray<std::thread> *threads; + pk_arr_t<std::packaged_task<void()> *> jobQueue; + pk_arr_t<std::thread*> threads; struct pk_membucket *bkt = nullptr; }; @@ -31,18 +31,16 @@ void ThreadRun(ThreadPool *tp) { tp->condition.wait(lck, [tp] { if (!tp->isRunning) return true; if (tp->isPaused) return true; - if (tp->jobQueue == nullptr) return true; - if (tp->jobQueue == CAFE_BABE(DynArray<std::packaged_task<void()> *>)) return true; - return tp->jobQueue->Count() != 0; + return tp->jobQueue.next != 0; }); - if (!tp->isRunning || tp->isPaused || tp->jobQueue == nullptr || tp->jobQueue == CAFE_BABE(DynArray<std::packaged_task<void()> *>)) { + if (!tp->isRunning || tp->isPaused) { return; } - if (tp->jobQueue->Count() == 0) { + if (tp->jobQueue.next == 0) { continue; } - j = (*tp->jobQueue)[0]; - tp->jobQueue->Remove(0, 1); + j = tp->jobQueue[0]; + pk_arr_remove_at(&tp->jobQueue, 0); } assert(j != nullptr); (*j)(); @@ -53,9 +51,9 @@ void ThreadRun(ThreadPool *tp) { void inline PkeThreads_JoinAll_Inner(ThreadPool &tp) { tp.condition.notify_all(); - long count = tp.threads->Count(); - for (long l = 0; l < count; ++l) { - auto &t = (*tp.threads)[l]; + uint32_t count = tp.threads.next; + for (uint32_t l = 0; l < count; ++l) { + auto &t = *tp.threads[l]; if (t.joinable()) { t.join(); } @@ -63,9 +61,9 @@ void inline PkeThreads_JoinAll_Inner(ThreadPool &tp) { } void inline PkeThreads_DetatchAll_Inner(ThreadPool &tp) { - long count = tp.threads->Count(); - for (long i = 0; i < count; ++i) { - auto &t = (*tp.threads)[i]; + uint32_t count = tp.threads.next; + for (uint32_t i = 0; i < count; ++i) { + auto &t = *tp.threads[i]; t.detach(); } tp.condition.notify_all(); @@ -75,16 +73,16 @@ void inline PkeThreads_Reset_Inner(ThreadPool &tp) { tp.mutex.lock(); tp.maxJobQueueCount = 0; tp.completedCount = 0; - tp.jobQueue->Resize(0); - tp.threads->Resize(0); + pk_arr_clear(&tp.jobQueue); + pk_arr_clear(&tp.threads); tp.mutex.unlock(); } bool inline PkeThreads_Enqueue_Inner(ThreadPool &tp, std::packaged_task<void()> *job) { tp.mutex.lock(); if (tp.isRunning == true) { - if (tp.jobQueue->Count() < tp.maxJobQueueCount) { - tp.jobQueue->Push(job); + if (tp.jobQueue.next < tp.maxJobQueueCount) { + pk_arr_append_t(&tp.jobQueue, job); tp.condition.notify_one(); tp.mutex.unlock(); return true; @@ -107,9 +105,9 @@ void inline PkeThreads_Pause_Inner(ThreadPool &tp) { void inline PkeThreads_Resume_Inner(ThreadPool &tp) { tp.mutex.lock(); tp.isPaused = false; - int64_t count = tp.threads->Count(); - for (int64_t i = 0; i < count; i++) { - (*tp.threads)[i] = std::thread(std::bind(ThreadRun, &tp)); + uint32_t count = tp.threads.next; + for (uint32_t i = 0; i < count; i++) { + new (tp.threads[i]) std::thread{std::bind(ThreadRun, &tp)}; } tp.mutex.unlock(); } @@ -121,7 +119,7 @@ void inline PkeThreads_Shutdown_Inner(ThreadPool &tp) { } tp.isRunning = false; tp.isPaused = false; - tp.jobQueue->Resize(0); + pk_arr_clear(&tp.jobQueue); tp.mutex.unlock(); } @@ -140,12 +138,13 @@ ThreadPoolHandle PkeThreads_Init(uint8_t threadCount, uint8_t maxQueueCount, str tp->isPaused = false; tp->maxJobQueueCount = maxQueueCount; tp->completedCount = 0; - tp->jobQueue = pk_new<DynArray<std::packaged_task<void()> *>>(bkt); - tp->threads = pk_new<DynArray<std::thread>>(bkt); + tp->jobQueue.bkt = bkt; + tp->threads.bkt = bkt; - tp->threads->Resize(threadCount); - for (long l = 0; l < threadCount; ++l) { - (*tp->threads)[l] = std::thread(std::bind(ThreadRun, tp)); + pk_arr_resize(&tp->threads, threadCount); + for (uint8_t l = 0; l < threadCount; ++l) { + tp->threads[l] = pk_new<std::thread>(bkt); + new (tp->threads[l]) std::thread(std::bind(ThreadRun, tp)); } return newHandle; @@ -172,9 +171,9 @@ bool PkeThreads_Enqueue(ThreadPoolHandle handle, std::packaged_task<void()> *job return PkeThreads_Enqueue_Inner(*tp, job); } -int64_t PkeThreads_GetQueueCount (ThreadPoolHandle handle) { +uint32_t PkeThreads_GetQueueCount (ThreadPoolHandle handle) { auto &threadPool = ThreadPool_BucketContainer.buckets[handle.bucketIndex][handle.itemIndex]; - return threadPool.jobQueue->Count(); + return threadPool.jobQueue.next; } void PkeThreads_Pause(ThreadPoolHandle handle) { @@ -206,10 +205,10 @@ void PkeThreads_Teardown(ThreadPoolHandle handle) { PkeThreads_Shutdown_Inner(*tp); PkeThreads_JoinAll_Inner(*tp); PkeThreads_Reset_Inner(*tp); - pk_delete<DynArray<std::packaged_task<void()> *>>(tp->jobQueue, tp->bkt); - pk_delete<DynArray<std::thread>>(tp->threads, tp->bkt); - tp->jobQueue = CAFE_BABE(DynArray<std::packaged_task<void()> *>); - tp->threads = CAFE_BABE(DynArray<std::thread>); + pk_arr_reset(&tp->jobQueue); + pk_arr_reset(&tp->threads); + tp->jobQueue.bkt = CAFE_BABE(struct pk_membucket); + tp->threads.bkt = CAFE_BABE(struct pk_membucket); tp->bkt = CAFE_BABE(struct pk_membucket); } diff --git a/src/thread-pool.hpp b/src/thread-pool.hpp index aabb9ee..3afb99b 100644 --- a/src/thread-pool.hpp +++ b/src/thread-pool.hpp @@ -14,7 +14,7 @@ void PkeThreads_Init(); ThreadPoolHandle PkeThreads_Init (uint8_t threadCount, uint8_t maxQueueCount, struct pk_membucket *bkt = nullptr); void PkeThreads_Reset (ThreadPoolHandle handle); bool PkeThreads_Enqueue (ThreadPoolHandle handle, std::packaged_task<void()> *job); -int64_t PkeThreads_GetQueueCount (ThreadPoolHandle handle); +uint32_t PkeThreads_GetQueueCount (ThreadPoolHandle handle); void PkeThreads_Pause (ThreadPoolHandle handle); void PkeThreads_Resume (ThreadPoolHandle handle); void PkeThreads_Shutdown (ThreadPoolHandle handle); |
