fix bug in sync

This commit is contained in:
Ruobing Han 2022-06-20 23:57:51 -04:00
parent c1045d8140
commit 57367c8348
2 changed files with 17 additions and 11 deletions

View File

@ -16,6 +16,7 @@ typedef struct c_thread {
unsigned index; unsigned index;
bool exit; bool exit;
bool busy; bool busy;
int completeTask;
} cu_ptd; } cu_ptd;
// kernel information // kernel information

View File

@ -82,11 +82,13 @@ __thread int warp_shfl[32] = {
/* /*
Enqueue Kernel (k) to the scheduler kernelQueue Enqueue Kernel (k) to the scheduler kernelQueue
*/ */
int TaskToExecute;
int schedulerEnqueueKernel(cu_kernel *k) { int schedulerEnqueueKernel(cu_kernel *k) {
int totalBlocks = int totalBlocks =
k->totalBlocks; // calculate gpu_block_to_execute_per_cpu_thread k->totalBlocks; // calculate gpu_block_to_execute_per_cpu_thread
int gpuBlockToExecutePerCpuThread = int gpuBlockToExecutePerCpuThread =
(totalBlocks + device_max_compute_units - 1) / device_max_compute_units; (totalBlocks + device_max_compute_units - 1) / device_max_compute_units;
TaskToExecute = 0;
for (int startBlockIdx = 0; startBlockIdx < totalBlocks; for (int startBlockIdx = 0; startBlockIdx < totalBlocks;
startBlockIdx += gpuBlockToExecutePerCpuThread) { startBlockIdx += gpuBlockToExecutePerCpuThread) {
cu_kernel *p = new cu_kernel(*k); cu_kernel *p = new cu_kernel(*k);
@ -94,6 +96,7 @@ int schedulerEnqueueKernel(cu_kernel *k) {
p->endBlockId = std::min(startBlockIdx + gpuBlockToExecutePerCpuThread - 1, p->endBlockId = std::min(startBlockIdx + gpuBlockToExecutePerCpuThread - 1,
totalBlocks - 1); totalBlocks - 1);
scheduler->kernelQueue->enqueue(p); scheduler->kernelQueue->enqueue(p);
TaskToExecute++;
} }
// printf("total: %d execute per cpu: %d\n", totalBlocks, // printf("total: %d execute per cpu: %d\n", totalBlocks,
@ -110,6 +113,10 @@ int cuLaunchKernel(cu_kernel **k) {
int status = C_RUN; int status = C_RUN;
// stream == 0 add to the kernelQueue // stream == 0 add to the kernelQueue
if (ker->stream == 0) { if (ker->stream == 0) {
// set complete to false, this variable is used for sync
for (int i = 0; i < scheduler->num_worker_threads; i++) {
scheduler->thread_pool[i].completeTask = 0;
}
schedulerEnqueueKernel(ker); schedulerEnqueueKernel(ker);
} else { } else {
printf("MultiStream no implemente\n"); printf("MultiStream no implemente\n");
@ -128,11 +135,9 @@ int get_work(c_thread *th) {
while (true) { while (true) {
// try to get a task from the queue // try to get a task from the queue
cu_kernel *k; cu_kernel *k;
th->busy = false; th->busy = scheduler->kernelQueue->wait_dequeue_timed(
bool getTask = scheduler->kernelQueue->wait_dequeue_timed(
k, std::chrono::milliseconds(5)); k, std::chrono::milliseconds(5));
if (getTask) { if (th->busy) {
th->busy = true;
// set runtime configuration // set runtime configuration
gridDim = k->gridDim; gridDim = k->gridDim;
blockDim = k->blockDim; blockDim = k->blockDim;
@ -157,10 +162,10 @@ int get_work(c_thread *th) {
block_index_z = tmp; block_index_z = tmp;
k->start_routine(k->args); k->start_routine(k->args);
} }
th->completeTask++;
} }
// if cannot get tasks, check whether programs stop // if cannot get tasks, check whether programs stop
else if (scheduler->threadpool_shutdown_requested) { if (scheduler->threadpool_shutdown_requested) {
th->busy = false;
return true; // thread exit return true; // thread exit
} }
} }
@ -227,14 +232,14 @@ void scheduler_uninit() {
*/ */
void cuSynchronizeBarrier() { void cuSynchronizeBarrier() {
while (1) { while (1) {
// sync is complete, only if queue size == 0 and none of // (TODO): currently, we assume each kernel launch will have a
// driver threads are busy // following sync
if (scheduler->kernelQueue->size_approx() == 0) { if (scheduler->kernelQueue->size_approx() == 0) {
bool none_busy = true; int completeBlock = 0;
for (int i = 0; i < scheduler->num_worker_threads; i++) { for (int i = 0; i < scheduler->num_worker_threads; i++) {
none_busy &= (!scheduler->thread_pool[i].busy); completeBlock += (scheduler->thread_pool[i].completeTask);
} }
if (none_busy) if (completeBlock == TaskToExecute)
break; break;
} }
} }