[WIP] use lock-free queue

This commit is contained in:
Ruobing Han 2022-06-20 19:01:28 -04:00
parent 7d29a409f6
commit cbf4cd90d8
6 changed files with 205 additions and 489 deletions

View File

@ -14,5 +14,6 @@ include_directories(./include/)
include_directories(./include/x86) include_directories(./include/x86)
include_directories(./threadPool/include/) include_directories(./threadPool/include/)
include_directories(./threadPool/include/x86) include_directories(./threadPool/include/x86)
include_directories(./threadPool/concurrentqueue)
file(GLOB proj_SOURCES "src/x86/*.cpp") file(GLOB proj_SOURCES "src/x86/*.cpp")
add_library(${LIB_NAME} SHARED ${proj_SOURCES}) add_library(${LIB_NAME} SHARED ${proj_SOURCES})

View File

@ -118,45 +118,24 @@ static int stream_counter = 1;
*/ */
cudaError_t cudaStreamCreate(cudaStream_t *pStream) { cudaError_t cudaStreamCreate(cudaStream_t *pStream) {
cstreamData *s = (cstreamData *)calloc(1, sizeof(cstreamData)); printf("No Implement\n");
if (s == NULL) exit(1);
return cudaErrorMemoryAllocation;
s->ev.status = C_RUN;
s->id = stream_counter;
stream_counter++;
s->stream_priority = DEFAULT;
create_KernelQueue(&(s->kernelQueue));
INIT_LOCK(s->stream_lock);
*pStream = (cudaStream_t)(s);
return cudaSuccess;
} }
cudaError_t cudaStreamDestroy(cudaStream_t stream) { cudaError_t cudaStreamDestroy(cudaStream_t stream) {
cstreamData *s = (cstreamData *)(stream); printf("No Implement\n");
exit(1);
free(s->kernelQueue);
DESTROY_LOCK(s->stream_lock);
free(s);
return cudaSuccess;
} }
cudaError_t cudaStreamSynchronize(cudaStream_t stream) { cudaError_t cudaStreamSynchronize(cudaStream_t stream) {
cstreamData *e = ((cstreamData *)(stream)); printf("No Implement\n");
MUTEX_LOCK(e->stream_lock); exit(1);
e->ev.status = C_SYNCHRONIZE;
e->ev.numKernelsToWait = e->kernelQueue->waiting_count;
MUTEX_UNLOCK(e->stream_lock);
} }
cudaError_t cudaGetDeviceCount(int *count) { cudaError_t cudaGetDeviceCount(int *count) {
// dummy value // dummy value
*count = 1; *count = 1;
return cudaSuccess;
} }
cudaError_t cudaGetDeviceProperties(cudaDeviceProp *deviceProp, int device) { cudaError_t cudaGetDeviceProperties(cudaDeviceProp *deviceProp, int device) {

View File

@ -13,6 +13,8 @@ set(CMAKE_CXX_STANDARD 14)
set(CMAKE_BUILD_TYPE Debug) set(CMAKE_BUILD_TYPE Debug)
include_directories(./include) include_directories(./include)
include_directories(./include/x86) include_directories(./include/x86)
include_directories(./concurrentqueue)
file(GLOB proj_SOURCES "src/x86/*.cpp") file(GLOB proj_SOURCES "src/x86/*.cpp")
add_library(${LIB_NAME} SHARED ${proj_SOURCES}) add_library(${LIB_NAME} SHARED ${proj_SOURCES})
#include "blockingconcurrentqueue.h"

View File

@ -5,15 +5,8 @@
cu_kernel *create_kernel(const void *func, dim3 gridDim, dim3 blockDim, cu_kernel *create_kernel(const void *func, dim3 gridDim, dim3 blockDim,
void **args, size_t sharedMem, cudaStream_t stream); void **args, size_t sharedMem, cudaStream_t stream);
int getWorkItem(struct kernel_queue **qu, cu_kernel *ker,
struct argument *kernel_arg, int **blockId);
int create_KernelQueue(kernel_queue **q); int create_KernelQueue(kernel_queue **q);
int dequeKernelLL(struct kernel_queue **qu);
int dequeKernel(struct kernel_queue **qu, cu_kernel *ker);
int enqueueKernel(struct kernel_queue **qu, cu_kernel **ker);
int scheduler_init(cu_device device); int scheduler_init(cu_device device);
void scheduler_uninit(); void scheduler_uninit();
void cuSynchronizeBarrier(); void cuSynchronizeBarrier();

View File

@ -3,20 +3,57 @@
#include "cuda_runtime.h" #include "cuda_runtime.h"
#include "pthread.h" #include "pthread.h"
#include "blockingconcurrentqueue.h"
typedef struct device { typedef struct device
{
int max_compute_units; int max_compute_units;
int device_id; int device_id;
} cu_device; } cu_device;
typedef struct c_thread { typedef struct c_thread
{
pthread_t thread; pthread_t thread;
unsigned long executed_commands; unsigned long executed_commands;
unsigned index; unsigned index;
bool exit; bool exit;
} cu_ptd; } cu_ptd;
typedef struct scheduler_pool { // kernel information
typedef struct kernel
{
void *(*start_routine)(void *);
void **args;
dim3 gridDim;
dim3 blockDim;
size_t shared_mem;
cudaStream_t stream;
struct event *barrier;
int status;
int totalBlocks;
int blockSize;
int startBlockId;
int endBlockId;
kernel(const kernel &obj) : start_routine(obj.start_routine), args(obj.args),
shared_mem(obj.shared_mem), blockSize(obj.blockSize),
gridDim(obj.gridDim), blockDim(obj.blockDim), totalBlocks(obj.totalBlocks) {}
} cu_kernel;
using kernel_queue = moodycamel::BlockingConcurrentQueue<kernel *>;
typedef struct scheduler_pool
{
struct c_thread *thread_pool; struct c_thread *thread_pool;
@ -35,34 +72,12 @@ typedef struct scheduler_pool {
// lock for scheduler // lock for scheduler
pthread_mutex_t work_queue_lock; pthread_mutex_t work_queue_lock;
// C99 array at the end kernel_queue *kernelQueue;
// user kernel queue for only user called functions
struct kernel_queue *kernelQueue;
} cu_pool; } cu_pool;
struct kernel_queue { typedef struct command
{
struct kernel *head;
struct kernel *tail;
// finish command count
unsigned long finish_count;
// waiting to be run on threads
unsigned long waiting_count;
// running count
unsigned long running_count;
// total count
unsigned long kernel_count;
// current index for task to be run
unsigned long current_index;
};
typedef struct command {
struct kernel *ker; struct kernel *ker;
@ -71,14 +86,16 @@ typedef struct command {
} cu_command; } cu_command;
typedef struct argument { typedef struct argument
{
// size of the argument to allocation // size of the argument to allocation
size_t size; size_t size;
void *value; void *value;
unsigned int index; unsigned int index;
} cu_argument; } cu_argument;
typedef struct input_arg { typedef struct input_arg
{
// real values for the input // real values for the input
char *p; char *p;
struct argument *argus[]; struct argument *argus[];
@ -87,14 +104,16 @@ typedef struct input_arg {
// so that we can parse the arguments p // so that we can parse the arguments p
} cu_input; } cu_input;
enum StreamType { enum StreamType
{
DEFAULT, DEFAULT,
LOW, LOW,
HIGH, HIGH,
EXT, EXT,
}; };
struct cStreamDataInternal { struct cStreamDataInternal
{
/* /*
status of the stream (run , wait) status of the stream (run , wait)
Run: Stream will asynchronously assign the kernel assign with this stream Run: Stream will asynchronously assign the kernel assign with this stream
@ -109,7 +128,8 @@ struct cStreamDataInternal {
unsigned int count; // number of task left in the stream unsigned int count; // number of task left in the stream
}; };
typedef struct streamData { typedef struct streamData
{
// execution status of current event monitor // execution status of current event monitor
struct cStreamDataInternal ev; struct cStreamDataInternal ev;
@ -118,46 +138,12 @@ typedef struct streamData {
unsigned int id; unsigned int id;
unsigned int stream_flags; unsigned int stream_flags;
// queue of the kernels in this stream kernel_queue *kernelQueue;
struct kernel_queue *kernelQueue;
} cstreamData; } cstreamData;
// kernel information
typedef struct kernel {
void *(*start_routine)(void *); typedef struct asyncKernel
{
void **args;
dim3 gridDim;
dim3 blockDim;
struct kernel *next;
struct kernel *prev;
size_t shared_mem;
cudaStream_t stream;
struct event *barrier;
int status;
int totalBlocks;
int N;
int blockSize;
int kernelId;
// current blockId
int blockId;
// execute multiple blocks per fetch
int gpu_block_to_execute_per_cpu_thread;
} cu_kernel;
typedef struct asyncKernel {
unsigned int numBlocks; unsigned int numBlocks;
unsigned int numThreads; unsigned int numThreads;
struct event *evt; struct event *evt;
@ -170,17 +156,20 @@ typedef struct asyncKernel {
// command queue of command nodes // command queue of command nodes
typedef struct kernel_arg_array { typedef struct kernel_arg_array
{
size_t size; size_t size;
unsigned int index; unsigned int index;
} karg_arr; } karg_arr;
typedef struct kernel_image_arg { typedef struct kernel_image_arg
{
size_t size; size_t size;
unsigned int index; unsigned int index;
} k_arg; } k_arg;
typedef struct callParams { typedef struct callParams
{
dim3 gridDim; dim3 gridDim;
dim3 blockDim; dim3 blockDim;
size_t shareMem; size_t shareMem;

View File

@ -6,6 +6,7 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <thread> #include <thread>
#include "blockingconcurrentqueue.h"
/* /*
@ -16,15 +17,16 @@
Initialize the device Initialize the device
*/ */
int device_max_compute_units = 1; int device_max_compute_units = 1;
int init_device() { int init_device()
{
cu_device *device = (cu_device *)calloc(1, sizeof(cu_device)); cu_device *device = (cu_device *)calloc(1, sizeof(cu_device));
if (device == NULL) if (device == NULL)
return C_ERROR_MEMALLOC; return C_ERROR_MEMALLOC;
device->max_compute_units = std::thread::hardware_concurrency(); device->max_compute_units = std::thread::hardware_concurrency();
device->max_compute_units = 4;
std::cout << device->max_compute_units std::cout << device->max_compute_units
<< " concurrent threads are supported.\n"; << " concurrent threads are supported.\n";
// device->max_compute_units = 64;
device_max_compute_units = device->max_compute_units; device_max_compute_units = device->max_compute_units;
// initialize scheduler // initialize scheduler
@ -35,115 +37,30 @@ int init_device() {
return C_SUCCESS; return C_SUCCESS;
} }
/* // Create Kernel
Create Kernel
*/
static int kernelIds = 0; static int kernelIds = 0;
cu_kernel *create_kernel(const void *func, dim3 gridDim, dim3 blockDim, cu_kernel *create_kernel(const void *func, dim3 gridDim, dim3 blockDim,
void **args, size_t sharedMem, cudaStream_t stream) { void **args, size_t sharedMem, cudaStream_t stream)
{
cu_kernel *ker = (cu_kernel *)calloc(1, sizeof(cu_kernel)); cu_kernel *ker = (cu_kernel *)calloc(1, sizeof(cu_kernel));
// set the function pointer // set the function pointer
ker->start_routine = (void *(*)(void *))func; ker->start_routine = (void *(*)(void *))func;
ker->args = args; ker->args = args;
// exit(1);
ker->gridDim = gridDim; ker->gridDim = gridDim;
ker->blockDim = blockDim; ker->blockDim = blockDim;
ker->shared_mem = sharedMem; ker->shared_mem = sharedMem;
// std::cout << "stream is null" << std::endl;
ker->stream = stream; ker->stream = stream;
// std::cout << "stream is null" << std::endl;
ker->blockId = 0;
ker->totalBlocks = gridDim.x * gridDim.y * gridDim.z; ker->totalBlocks = gridDim.x * gridDim.y * gridDim.z;
ker->N = blockDim.x * blockDim.y * blockDim.z;
ker->kernelId = kernelIds;
kernelIds += 1;
ker->blockSize = blockDim.x * blockDim.y * blockDim.z; ker->blockSize = blockDim.x * blockDim.y * blockDim.z;
return ker; return ker;
} }
/*
Create Kernel Queue
*/
int create_KernelQueue(kernel_queue **q) {
*q = (kernel_queue *)calloc(1, sizeof(kernel_queue));
if (*q == NULL) {
return C_ERROR_MEMALLOC;
}
(*q)->kernel_count = 0;
(*q)->running_count = 0;
(*q)->waiting_count = 0;
(*q)->finish_count = 0;
(*q)->current_index = 0;
return C_SUCCESS;
}
int dequeKernelLL(struct kernel_queue **qu) {
struct kernel_queue *q = *qu;
q->finish_count += 1;
// free the pointer
if (q->head == NULL) {
return C_QUEUE_EMPTY;
} else {
//*ker = *(q->head);
q->head = (q->head)->next;
if (q->head != NULL) {
q->head->prev = NULL;
}
}
return C_SUCCESS;
}
int enqueueKernel(struct kernel_queue **qu, cu_kernel **ker) {
struct kernel_queue *q = *qu;
cu_kernel *p = *ker;
// calculate gpu_block_to_execute_per_cpu_thread
p->gpu_block_to_execute_per_cpu_thread =
(p->totalBlocks + device_max_compute_units - 1) /
device_max_compute_units;
printf("total: %d execute per cpu: %d\n", p->totalBlocks,
p->gpu_block_to_execute_per_cpu_thread);
if (q->head == NULL) {
q->head = p;
q->tail = p;
} else {
p->prev = q->tail;
q->tail->next = p;
q->tail = p;
p->next = NULL;
}
q->kernel_count += 1;
q->waiting_count += 1;
// float** t1 = (float**)*(q->head->args + 0);
// printf("enqueueKernelTest Args 1: %p \n ", (void *) &t1);
// printf("enqueueKernel Test Args 1: %p \n ", (void *) *(q->head->args + 0));
// float* t2 = *(t1);
// printf("enqueueKernel G Test Args: %p, val: %f\n ",(void *) &t2, *t2);
// user kernel command
return C_SUCCESS;
}
// scheduler // scheduler
static cu_pool *scheduler; static cu_pool *scheduler;
@ -161,187 +78,109 @@ __thread int block_index_z = 0;
__thread int thread_memory_size = 0; __thread int thread_memory_size = 0;
__thread int *dynamic_shared_memory = NULL; __thread int *dynamic_shared_memory = NULL;
__thread int warp_shfl[32] = { __thread int warp_shfl[32] = {
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
}; };
/* /*
Enqueue Kernel (k) to the scheduler kernelQueue Enqueue Kernel (k) to the scheduler kernelQueue
*/ */
int schedulerEnqueueKernel(cu_kernel **k) { int schedulerEnqueueKernel(cu_kernel *k)
cu_kernel *ker = *k; {
MUTEX_LOCK(scheduler->work_queue_lock); int totalBlocks = k->totalBlocks; // calculate gpu_block_to_execute_per_cpu_thread
int gpuBlockToExecutePerCpuThread =
(totalBlocks + device_max_compute_units - 1) /
device_max_compute_units;
for (int startBlockIdx = 0; startBlockIdx < totalBlocks; startBlockIdx += gpuBlockToExecutePerCpuThread)
{
cu_kernel *p = new cu_kernel(*k);
p->startBlockId = startBlockIdx;
p->endBlockId = std::min(startBlockIdx + gpuBlockToExecutePerCpuThread - 1, totalBlocks - 1);
scheduler->kernelQueue->enqueue(p);
}
enqueueKernel(&scheduler->kernelQueue, &ker); printf("total: %d execute per cpu: %d\n", totalBlocks,
// float** t1 = (float**)*(ker->args + 0); gpuBlockToExecutePerCpuThread);
// printf("scheduler enqueue Test Args 1: %p \n ", (void *) &t1); return C_SUCCESS;
// printf("scheduler enqueue Test Args 1: %p \n ", (void *) *(ker->args + 0));
// float* t2 = *(t1);
// printf("scheduler enqueue G Test Args: %p, val: %f\n ",(void *) &t2, *t2);
pthread_cond_broadcast(&(scheduler->wake_pool));
MUTEX_UNLOCK(scheduler->work_queue_lock);
return 0;
} }
/* /*
Kernel Launch with numBlocks and numThreadsPerBlock Kernel Launch with numBlocks and numThreadsPerBlock
*/ */
int cuLaunchKernel(cu_kernel **k) { int cuLaunchKernel(cu_kernel **k)
if (!scheduler) { {
if (!scheduler)
{
init_device(); init_device();
} }
std::cout << "launch\n" << std::flush; std::cout << "launch\n"
<< std::flush;
// Calculate Block Size N/numBlocks // Calculate Block Size N/numBlocks
cu_kernel *ker = *k; cu_kernel *ker = *k;
int status = C_RUN; int status = C_RUN;
MUTEX_LOCK(scheduler->work_queue_lock);
scheduler->num_kernel_queued += 1;
MUTEX_UNLOCK(scheduler->work_queue_lock);
// stream == 0 add to the kernelQueue // stream == 0 add to the kernelQueue
if (ker->stream == 0) { if (ker->stream == 0)
// float** t1 = (float**)*(ker->args + 0); {
// printf("cuLaunchKernel Test Args 1: %p \n ", (void *) &t1); schedulerEnqueueKernel(ker);
// printf("cuLaunchKernel Test Args 1: %p \n ", (void *) *(ker->args + 0)); }
// float* t2 = *(t1); else
// printf("cuLaunchkernel G Test Args: %p, val: %f\n ",(void *) &t2, *t2); {
schedulerEnqueueKernel(k); printf("MultiStream no implemente\n");
} else { exit(1);
// add to it's stream queue
// stream queue can be waiting or running with or without tasks
MUTEX_LOCK(((cstreamData *)(ker->stream))->stream_lock);
status = ((cstreamData *)(ker->stream))->ev.status;
// if stream queue status is run (first kernel) (enqueue to the kernel
// queue)
cstreamData *e = ((cstreamData *)(ker->stream));
// synchronized is called after no job in the queue so stream is stuck on
// synchronize
// printf("this way sync\n");
if (e->ev.status == C_SYNCHRONIZE) {
if ((e->kernelQueue->finish_count) == (e->kernelQueue->kernel_count)) {
e->ev.status = C_RUN;
}
}
if (e->ev.status == C_RUN) {
// change the status to wait
e->ev.status == C_WAIT;
MUTEX_UNLOCK(((cstreamData *)(ker->stream))->stream_lock);
// printf("this way enqueue\n");
schedulerEnqueueKernel(&ker);
} else {
// the status of stream queue is wait so just enqueue to the stream
// printf("this way enqwlijs\n");
enqueueKernel(&((cstreamData *)(ker->stream))->kernelQueue, &ker);
MUTEX_UNLOCK(((cstreamData *)(ker->stream))->stream_lock);
}
} }
return 0; return 0;
} }
/*
Get Work Item: get the kernel from the queue and increment blockId
*/
int getWorkItem(struct kernel_queue **qu, cu_kernel **kern, int blockId) {
struct kernel_queue *q = *qu;
if (q->waiting_count > 0) {
*kern = q->head;
cu_kernel *ker = *kern;
if (blockId + 1 == q->head->totalBlocks) {
// deque the head
dequeKernelLL(qu);
ker->status = C_COMPLETE;
q->waiting_count -= 1;
} else {
q->head->blockId += 1;
}
q->finish_count += 1;
} else {
return C_QUEUE_EMPTY;
}
return C_SUCCESS;
}
/* /*
Thread Gets Work Thread Gets Work
*/ */
int get_work(c_thread *th) { int get_work(c_thread *th)
cu_kernel ker; {
// std::cout << "Before Get Work Mutex Queue" << std::endl;
MUTEX_LOCK(scheduler->work_queue_lock);
// std::cout << "After Get Work Mutex Queue" << std::endl;
RETRY:
int is_exit = 0;
int is_command_not_null = 0;
int block_to_execute = 256;
int blockId;
int localBlockSize;
int status;
int completion_status = 0;
int dynamic_shared_mem_size = 0; int dynamic_shared_mem_size = 0;
dim3 gridDim; dim3 gridDim;
dim3 blockDim; dim3 blockDim;
while (true)
is_exit = scheduler->threadpool_shutdown_requested; {
// try to get a task from the queue
MUTEX_UNLOCK(scheduler->work_queue_lock); cu_kernel *k;
bool getTask = scheduler->kernelQueue->wait_dequeue_timed(k, std::chrono::milliseconds(5));
if (!is_exit) { if (getTask)
{
MUTEX_LOCK(scheduler->work_queue_lock); // set runtime configuration
gridDim = k->gridDim;
// if kernel waiting to be complete is not zero blockDim = k->blockDim;
if (scheduler->kernelQueue->waiting_count > 0) { dynamic_shared_mem_size = k->shared_mem;
// std::cout << "Waiting Count is greater than 0" << std::endl; block_size = k->blockSize;
blockId = scheduler->kernelQueue->head->blockId;
gridDim = scheduler->kernelQueue->head->gridDim;
blockDim = scheduler->kernelQueue->head->blockDim;
dynamic_shared_mem_size = scheduler->kernelQueue->head->shared_mem;
// std::cout << "Block ID: " << blockId << std::endl;
localBlockSize = scheduler->kernelQueue->head->blockSize;
// set status as success fully queue
status = C_SUCCESS;
ker = *(scheduler->kernelQueue->head);
block_to_execute =
scheduler->kernelQueue->head->gpu_block_to_execute_per_cpu_thread;
// if the blockId + 1 is equal to the goal block size ,
// then its the last block
if (blockId + block_to_execute >=
scheduler->kernelQueue->head->totalBlocks) {
block_to_execute = scheduler->kernelQueue->head->totalBlocks - blockId;
// deque the head
dequeKernelLL(&scheduler->kernelQueue);
ker.status = C_COMPLETE;
scheduler->kernelQueue->waiting_count -= 1;
} else {
// increment the blockId
scheduler->kernelQueue->head->blockId =
scheduler->kernelQueue->head->blockId + block_to_execute;
}
// status = getWorkItem(&scheduler->kernelQueue, &ker, blockId);
} else {
status = C_QUEUE_EMPTY;
}
MUTEX_UNLOCK(scheduler->work_queue_lock);
}
if (status != C_QUEUE_EMPTY) {
// set TLS
for (int s = 0; s < block_to_execute; s++) {
block_index = blockId + s;
block_size = localBlockSize;
block_size_x = blockDim.x; block_size_x = blockDim.x;
block_size_y = blockDim.y; block_size_y = blockDim.y;
block_size_z = blockDim.z; block_size_z = blockDim.z;
@ -350,120 +189,70 @@ RETRY:
grid_size_z = gridDim.z; grid_size_z = gridDim.z;
if (dynamic_shared_mem_size > 0) if (dynamic_shared_mem_size > 0)
dynamic_shared_memory = (int *)malloc(dynamic_shared_mem_size); dynamic_shared_memory = (int *)malloc(dynamic_shared_mem_size);
int tmp = block_index; // execute GPU blocks
block_index_x = tmp / (grid_size_y * grid_size_z); printf("exec: from: %d to : %d\n",k->startBlockId, k->endBlockId);
tmp = tmp % (grid_size_y * grid_size_z); for (block_index = k->startBlockId; block_index < k->endBlockId + 1; block_index++)
block_index_y = tmp / (grid_size_z); {
tmp = tmp % (grid_size_z); int tmp = block_index;
block_index_z = tmp; block_index_x = tmp / (grid_size_y * grid_size_z);
ker.start_routine(ker.args); tmp = tmp % (grid_size_y * grid_size_z);
} block_index_y = tmp / (grid_size_z);
tmp = tmp % (grid_size_z);
is_command_not_null = 1; block_index_z = tmp;
if (ker.status == C_COMPLETE) { k->start_routine(k->args);
// check if this kernel's stream has more jobs to run (enqueue the next
// job)
if (ker.stream != NULL) {
bool synchronize = false;
MUTEX_LOCK(((cstreamData *)(ker.stream))->stream_lock);
if (((cstreamData *)(ker.stream))->ev.status == C_SYNCHRONIZE) {
// synchronize stream
if (((cstreamData *)(ker.stream))->ev.numKernelsToWait > 0) {
((cstreamData *)(ker.stream))->ev.numKernelsToWait -= 1;
}
if (((cstreamData *)(ker.stream))->ev.status == C_SYNCHRONIZE) {
// synchronize stream
if (((cstreamData *)(ker.stream))->ev.numKernelsToWait > 0) {
((cstreamData *)(ker.stream))->ev.numKernelsToWait -= 1;
}
if (((cstreamData *)(ker.stream))->ev.numKernelsToWait == 0) {
synchronize = false;
} else {
synchronize = true;
}
}
if (synchronize == false) {
if (((cstreamData *)(ker.stream))->kernelQueue->waiting_count > 0) {
((cstreamData *)(ker.stream))->ev.status = C_WAIT;
cu_kernel *kern =
((cstreamData *)(ker.stream))->kernelQueue->head;
schedulerEnqueueKernel(&kern);
dequeKernelLL(&((cstreamData *)(ker.stream))->kernelQueue);
} else {
// switch the stream to run to allow for the next execution
((cstreamData *)(ker.stream))->ev.status = C_RUN;
}
}
}
MUTEX_UNLOCK(((cstreamData *)(ker.stream))->stream_lock);
} }
MUTEX_LOCK(scheduler->work_queue_lock); printf("done: from: %d to : %d\n",k->startBlockId, k->endBlockId);
scheduler->num_kernel_finished += 1; }
MUTEX_UNLOCK(scheduler->work_queue_lock); // if cannot get tasks, check whether programs stop
else if (scheduler->threadpool_shutdown_requested)
{
return true; // thread exit
} }
} }
return 0;
MUTEX_LOCK(scheduler->work_queue_lock);
if ((is_exit == 0 && is_command_not_null == 0)) {
// all threads in condition wait
scheduler->idle_threads += 1;
if (scheduler->idle_threads == scheduler->num_worker_threads) {
pthread_cond_broadcast(&(scheduler->wake_host));
}
pthread_cond_wait(&(scheduler->wake_pool), &(scheduler->work_queue_lock));
scheduler->idle_threads -= 1;
goto RETRY;
}
MUTEX_UNLOCK(scheduler->work_queue_lock);
return is_exit;
} }
void *driver_thread(void *p) { void *driver_thread(void *p)
{
struct c_thread *td = (struct c_thread *)p; struct c_thread *td = (struct c_thread *)p;
int is_exit = 0; int is_exit = 0;
td->exit = false; td->exit = false;
while (1) { // get work
// get work printf("before getwork\n");
is_exit = get_work(td); is_exit = get_work(td);
printf("after getwork\n");
// exit the routine // exit the routine
if (is_exit) { if (is_exit)
td->exit = true; {
// pthread_exit td->exit = true;
pthread_exit(NULL); // pthread_exit
} printf("pthread exit\n");
pthread_exit(NULL);
}
else
{
printf("driver thread stop incorrectly\n");
exit(1);
} }
} }
/* /*
Initialize the scheduler Initialize the scheduler
*/ */
int scheduler_init(cu_device device) { int scheduler_init(cu_device device)
{
scheduler = (cu_pool *)calloc(1, sizeof(cu_pool)); scheduler = (cu_pool *)calloc(1, sizeof(cu_pool));
scheduler->num_worker_threads = device.max_compute_units; scheduler->num_worker_threads = device.max_compute_units;
scheduler->num_kernel_queued = 0; scheduler->num_kernel_queued = 0;
scheduler->thread_pool = (struct c_thread *)calloc( scheduler->thread_pool = (struct c_thread *)calloc(
scheduler->num_worker_threads, sizeof(c_thread)); scheduler->num_worker_threads, sizeof(c_thread));
kernel_queue *asq; scheduler->kernelQueue = new kernel_queue;
create_KernelQueue(&asq);
scheduler->kernelQueue = asq;
INIT_LOCK(scheduler->work_queue_lock);
pthread_cond_init(&scheduler->wake_pool, NULL);
pthread_cond_init(&scheduler->wake_host, NULL);
scheduler->idle_threads = 0; scheduler->idle_threads = 0;
for (int i = 0; i < scheduler->num_worker_threads; i++) { for (int i = 0; i < scheduler->num_worker_threads; i++)
{
scheduler->thread_pool[i].index = i; scheduler->thread_pool[i].index = i;
pthread_create(&scheduler->thread_pool[i].thread, NULL, driver_thread, pthread_create(&scheduler->thread_pool[i].thread, NULL, driver_thread,
(void *)&scheduler->thread_pool[i]); (void *)&scheduler->thread_pool[i]);
@ -472,42 +261,10 @@ int scheduler_init(cu_device device) {
return C_SUCCESS; return C_SUCCESS;
} }
void scheduler_uninit() { void scheduler_uninit()
unsigned i; {
printf("No Implemente\n");
int r = pthread_mutex_lock(&scheduler->work_queue_lock); exit(1);
assert(r == 0);
scheduler->threadpool_shutdown_requested = 1;
pthread_cond_broadcast(&scheduler->wake_pool);
int r1 = pthread_mutex_unlock(&scheduler->work_queue_lock);
assert(r1 == 0);
for (i = 0; i < scheduler->num_worker_threads; i++) {
pthread_join(scheduler->thread_pool[i].thread, NULL);
}
free(scheduler->thread_pool);
free(scheduler->kernelQueue);
pthread_mutex_destroy(&scheduler->work_queue_lock);
pthread_cond_destroy(&scheduler->wake_pool);
pthread_cond_destroy(&scheduler->wake_host);
scheduler->threadpool_shutdown_requested = 0;
}
int cuWait(cstreamData *evt) {
AGAIN:
int r = pthread_mutex_lock(&evt->stream_lock);
assert(r == 0);
if (evt->ev.status != C_COMPLETE) {
int r1 = pthread_mutex_unlock(&evt->stream_lock);
assert(r1 == 0);
goto AGAIN;
}
return C_SUCCESS;
} }
/* /*
@ -522,14 +279,9 @@ AGAIN:
Sense Like Barrier Sense Like Barrier
Counting Barrier basically Counting Barrier basically
*/ */
void cuSynchronizeBarrier() { void cuSynchronizeBarrier()
// std::cout << "cuSynchronizeBarrier" << std::endl; {
MUTEX_LOCK(scheduler->work_queue_lock); while (scheduler->kernelQueue->size_approx() > 0)
;
if (scheduler->num_kernel_finished != scheduler->num_kernel_queued || printf("size: %d\n",scheduler->kernelQueue->size_approx());
scheduler->idle_threads != scheduler->num_worker_threads) {
// scheduler->idle_threads, scheduler->num_worker_threads);
pthread_cond_wait(&(scheduler->wake_host), &(scheduler->work_queue_lock));
}
MUTEX_UNLOCK(scheduler->work_queue_lock);
} }