From 57367c834821ac2fa991975d84f6fe5706314aa6 Mon Sep 17 00:00:00 2001 From: Ruobing Han Date: Mon, 20 Jun 2022 23:57:51 -0400 Subject: [PATCH] fix bug in sync --- runtime/threadPool/include/x86/structures.h | 1 + runtime/threadPool/src/x86/api.cpp | 27 ++++++++++++--------- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/runtime/threadPool/include/x86/structures.h b/runtime/threadPool/include/x86/structures.h index 9a2f8d2..4170e39 100644 --- a/runtime/threadPool/include/x86/structures.h +++ b/runtime/threadPool/include/x86/structures.h @@ -16,6 +16,7 @@ typedef struct c_thread { unsigned index; bool exit; bool busy; + int completeTask; } cu_ptd; // kernel information diff --git a/runtime/threadPool/src/x86/api.cpp b/runtime/threadPool/src/x86/api.cpp index abad868..f0c0e5b 100644 --- a/runtime/threadPool/src/x86/api.cpp +++ b/runtime/threadPool/src/x86/api.cpp @@ -82,11 +82,13 @@ __thread int warp_shfl[32] = { /* Enqueue Kernel (k) to the scheduler kernelQueue */ +int TaskToExecute; int schedulerEnqueueKernel(cu_kernel *k) { int totalBlocks = k->totalBlocks; // calculate gpu_block_to_execute_per_cpu_thread int gpuBlockToExecutePerCpuThread = (totalBlocks + device_max_compute_units - 1) / device_max_compute_units; + TaskToExecute = 0; for (int startBlockIdx = 0; startBlockIdx < totalBlocks; startBlockIdx += gpuBlockToExecutePerCpuThread) { cu_kernel *p = new cu_kernel(*k); @@ -94,6 +96,7 @@ int schedulerEnqueueKernel(cu_kernel *k) { p->endBlockId = std::min(startBlockIdx + gpuBlockToExecutePerCpuThread - 1, totalBlocks - 1); scheduler->kernelQueue->enqueue(p); + TaskToExecute++; } // printf("total: %d execute per cpu: %d\n", totalBlocks, @@ -110,6 +113,10 @@ int cuLaunchKernel(cu_kernel **k) { int status = C_RUN; // stream == 0 add to the kernelQueue if (ker->stream == 0) { + // set complete to false, this variable is used for sync + for (int i = 0; i < scheduler->num_worker_threads; i++) { + scheduler->thread_pool[i].completeTask = 0; + } schedulerEnqueueKernel(ker); } else { printf("MultiStream no implemente\n"); @@ -128,11 +135,9 @@ int get_work(c_thread *th) { while (true) { // try to get a task from the queue cu_kernel *k; - th->busy = false; - bool getTask = scheduler->kernelQueue->wait_dequeue_timed( + th->busy = scheduler->kernelQueue->wait_dequeue_timed( k, std::chrono::milliseconds(5)); - if (getTask) { - th->busy = true; + if (th->busy) { // set runtime configuration gridDim = k->gridDim; blockDim = k->blockDim; @@ -157,10 +162,10 @@ int get_work(c_thread *th) { block_index_z = tmp; k->start_routine(k->args); } + th->completeTask++; } // if cannot get tasks, check whether programs stop - else if (scheduler->threadpool_shutdown_requested) { - th->busy = false; + if (scheduler->threadpool_shutdown_requested) { return true; // thread exit } } @@ -227,14 +232,14 @@ void scheduler_uninit() { */ void cuSynchronizeBarrier() { while (1) { - // sync is complete, only if queue size == 0 and none of - // driver threads are busy + // (TODO): currently, we assume each kernel launch will have a + // following sync if (scheduler->kernelQueue->size_approx() == 0) { - bool none_busy = true; + int completeBlock = 0; for (int i = 0; i < scheduler->num_worker_threads; i++) { - none_busy &= (!scheduler->thread_pool[i].busy); + completeBlock += (scheduler->thread_pool[i].completeTask); } - if (none_busy) + if (completeBlock == TaskToExecute) break; } }