summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJonathan Bradley <jcb@pikum.xyz>2025-05-20 21:03:57 -0400
committerJonathan Bradley <jcb@pikum.xyz>2025-05-20 21:03:57 -0400
commitead9e484db969a880470d625b1884aced296e722 (patch)
treed36ce4ac2d0676dc22a152e8a3b26513ea8234dc /src
parent8318fca632b8ff82c5966876c192fae891501771 (diff)
pke: thread-pool swap DynArray with pke_arr
Diffstat (limited to 'src')
-rw-r--r--src/thread-pool.cpp69
-rw-r--r--src/thread-pool.hpp2
2 files changed, 35 insertions, 36 deletions
diff --git a/src/thread-pool.cpp b/src/thread-pool.cpp
index b6ff97e..15c5551 100644
--- a/src/thread-pool.cpp
+++ b/src/thread-pool.cpp
@@ -2,7 +2,7 @@
#include "thread-pool.hpp"
#include "bucketed-array.hpp"
-#include "dynamic-array.hpp"
+#include "pk.h"
#include <functional>
#include <future>
@@ -14,8 +14,8 @@ struct ThreadPool {
std::mutex mutex;
std::atomic<uint64_t> completedCount;
std::condition_variable condition;
- DynArray<std::packaged_task<void()> *> *jobQueue;
- DynArray<std::thread> *threads;
+ pk_arr_t<std::packaged_task<void()> *> jobQueue;
+ pk_arr_t<std::thread*> threads;
struct pk_membucket *bkt = nullptr;
};
@@ -31,18 +31,16 @@ void ThreadRun(ThreadPool *tp) {
tp->condition.wait(lck, [tp] {
if (!tp->isRunning) return true;
if (tp->isPaused) return true;
- if (tp->jobQueue == nullptr) return true;
- if (tp->jobQueue == CAFE_BABE(DynArray<std::packaged_task<void()> *>)) return true;
- return tp->jobQueue->Count() != 0;
+ return tp->jobQueue.next != 0;
});
- if (!tp->isRunning || tp->isPaused || tp->jobQueue == nullptr || tp->jobQueue == CAFE_BABE(DynArray<std::packaged_task<void()> *>)) {
+ if (!tp->isRunning || tp->isPaused) {
return;
}
- if (tp->jobQueue->Count() == 0) {
+ if (tp->jobQueue.next == 0) {
continue;
}
- j = (*tp->jobQueue)[0];
- tp->jobQueue->Remove(0, 1);
+ j = tp->jobQueue[0];
+ pk_arr_remove_at(&tp->jobQueue, 0);
}
assert(j != nullptr);
(*j)();
@@ -53,9 +51,9 @@ void ThreadRun(ThreadPool *tp) {
void inline PkeThreads_JoinAll_Inner(ThreadPool &tp) {
tp.condition.notify_all();
- long count = tp.threads->Count();
- for (long l = 0; l < count; ++l) {
- auto &t = (*tp.threads)[l];
+ uint32_t count = tp.threads.next;
+ for (uint32_t l = 0; l < count; ++l) {
+ auto &t = *tp.threads[l];
if (t.joinable()) {
t.join();
}
@@ -63,9 +61,9 @@ void inline PkeThreads_JoinAll_Inner(ThreadPool &tp) {
}
void inline PkeThreads_DetatchAll_Inner(ThreadPool &tp) {
- long count = tp.threads->Count();
- for (long i = 0; i < count; ++i) {
- auto &t = (*tp.threads)[i];
+ uint32_t count = tp.threads.next;
+ for (uint32_t i = 0; i < count; ++i) {
+ auto &t = *tp.threads[i];
t.detach();
}
tp.condition.notify_all();
@@ -75,16 +73,16 @@ void inline PkeThreads_Reset_Inner(ThreadPool &tp) {
tp.mutex.lock();
tp.maxJobQueueCount = 0;
tp.completedCount = 0;
- tp.jobQueue->Resize(0);
- tp.threads->Resize(0);
+ pk_arr_clear(&tp.jobQueue);
+ pk_arr_clear(&tp.threads);
tp.mutex.unlock();
}
bool inline PkeThreads_Enqueue_Inner(ThreadPool &tp, std::packaged_task<void()> *job) {
tp.mutex.lock();
if (tp.isRunning == true) {
- if (tp.jobQueue->Count() < tp.maxJobQueueCount) {
- tp.jobQueue->Push(job);
+ if (tp.jobQueue.next < tp.maxJobQueueCount) {
+ pk_arr_append_t(&tp.jobQueue, job);
tp.condition.notify_one();
tp.mutex.unlock();
return true;
@@ -107,9 +105,9 @@ void inline PkeThreads_Pause_Inner(ThreadPool &tp) {
void inline PkeThreads_Resume_Inner(ThreadPool &tp) {
tp.mutex.lock();
tp.isPaused = false;
- int64_t count = tp.threads->Count();
- for (int64_t i = 0; i < count; i++) {
- (*tp.threads)[i] = std::thread(std::bind(ThreadRun, &tp));
+ uint32_t count = tp.threads.next;
+ for (uint32_t i = 0; i < count; i++) {
+ new (tp.threads[i]) std::thread{std::bind(ThreadRun, &tp)};
}
tp.mutex.unlock();
}
@@ -121,7 +119,7 @@ void inline PkeThreads_Shutdown_Inner(ThreadPool &tp) {
}
tp.isRunning = false;
tp.isPaused = false;
- tp.jobQueue->Resize(0);
+ pk_arr_clear(&tp.jobQueue);
tp.mutex.unlock();
}
@@ -140,12 +138,13 @@ ThreadPoolHandle PkeThreads_Init(uint8_t threadCount, uint8_t maxQueueCount, str
tp->isPaused = false;
tp->maxJobQueueCount = maxQueueCount;
tp->completedCount = 0;
- tp->jobQueue = pk_new<DynArray<std::packaged_task<void()> *>>(bkt);
- tp->threads = pk_new<DynArray<std::thread>>(bkt);
+ tp->jobQueue.bkt = bkt;
+ tp->threads.bkt = bkt;
- tp->threads->Resize(threadCount);
- for (long l = 0; l < threadCount; ++l) {
- (*tp->threads)[l] = std::thread(std::bind(ThreadRun, tp));
+ pk_arr_resize(&tp->threads, threadCount);
+ for (uint8_t l = 0; l < threadCount; ++l) {
+ tp->threads[l] = pk_new<std::thread>(bkt);
+ new (tp->threads[l]) std::thread(std::bind(ThreadRun, tp));
}
return newHandle;
@@ -172,9 +171,9 @@ bool PkeThreads_Enqueue(ThreadPoolHandle handle, std::packaged_task<void()> *job
return PkeThreads_Enqueue_Inner(*tp, job);
}
-int64_t PkeThreads_GetQueueCount (ThreadPoolHandle handle) {
+uint32_t PkeThreads_GetQueueCount (ThreadPoolHandle handle) {
auto &threadPool = ThreadPool_BucketContainer.buckets[handle.bucketIndex][handle.itemIndex];
- return threadPool.jobQueue->Count();
+ return threadPool.jobQueue.next;
}
void PkeThreads_Pause(ThreadPoolHandle handle) {
@@ -206,10 +205,10 @@ void PkeThreads_Teardown(ThreadPoolHandle handle) {
PkeThreads_Shutdown_Inner(*tp);
PkeThreads_JoinAll_Inner(*tp);
PkeThreads_Reset_Inner(*tp);
- pk_delete<DynArray<std::packaged_task<void()> *>>(tp->jobQueue, tp->bkt);
- pk_delete<DynArray<std::thread>>(tp->threads, tp->bkt);
- tp->jobQueue = CAFE_BABE(DynArray<std::packaged_task<void()> *>);
- tp->threads = CAFE_BABE(DynArray<std::thread>);
+ pk_arr_reset(&tp->jobQueue);
+ pk_arr_reset(&tp->threads);
+ tp->jobQueue.bkt = CAFE_BABE(struct pk_membucket);
+ tp->threads.bkt = CAFE_BABE(struct pk_membucket);
tp->bkt = CAFE_BABE(struct pk_membucket);
}
diff --git a/src/thread-pool.hpp b/src/thread-pool.hpp
index aabb9ee..3afb99b 100644
--- a/src/thread-pool.hpp
+++ b/src/thread-pool.hpp
@@ -14,7 +14,7 @@ void PkeThreads_Init();
ThreadPoolHandle PkeThreads_Init (uint8_t threadCount, uint8_t maxQueueCount, struct pk_membucket *bkt = nullptr);
void PkeThreads_Reset (ThreadPoolHandle handle);
bool PkeThreads_Enqueue (ThreadPoolHandle handle, std::packaged_task<void()> *job);
-int64_t PkeThreads_GetQueueCount (ThreadPoolHandle handle);
+uint32_t PkeThreads_GetQueueCount (ThreadPoolHandle handle);
void PkeThreads_Pause (ThreadPoolHandle handle);
void PkeThreads_Resume (ThreadPoolHandle handle);
void PkeThreads_Shutdown (ThreadPoolHandle handle);