diff --git a/runtime/CMakeLists.txt b/runtime/CMakeLists.txt index 94248b3..3400169 100644 --- a/runtime/CMakeLists.txt +++ b/runtime/CMakeLists.txt @@ -14,5 +14,6 @@ include_directories(./include/) include_directories(./include/x86) include_directories(./threadPool/include/) include_directories(./threadPool/include/x86) +include_directories(./threadPool/concurrentqueue) file(GLOB proj_SOURCES "src/x86/*.cpp") add_library(${LIB_NAME} SHARED ${proj_SOURCES}) diff --git a/runtime/src/x86/cudaRuntimeImpl.cpp b/runtime/src/x86/cudaRuntimeImpl.cpp index 3406285..fd7c3d9 100644 --- a/runtime/src/x86/cudaRuntimeImpl.cpp +++ b/runtime/src/x86/cudaRuntimeImpl.cpp @@ -118,45 +118,24 @@ static int stream_counter = 1; */ cudaError_t cudaStreamCreate(cudaStream_t *pStream) { - cstreamData *s = (cstreamData *)calloc(1, sizeof(cstreamData)); - if (s == NULL) - return cudaErrorMemoryAllocation; - s->ev.status = C_RUN; - s->id = stream_counter; - stream_counter++; - s->stream_priority = DEFAULT; - create_KernelQueue(&(s->kernelQueue)); - - INIT_LOCK(s->stream_lock); - *pStream = (cudaStream_t)(s); - - return cudaSuccess; + printf("No Implement\n"); + exit(1); } cudaError_t cudaStreamDestroy(cudaStream_t stream) { - cstreamData *s = (cstreamData *)(stream); - - free(s->kernelQueue); - - DESTROY_LOCK(s->stream_lock); - - free(s); - - return cudaSuccess; + printf("No Implement\n"); + exit(1); } cudaError_t cudaStreamSynchronize(cudaStream_t stream) { - cstreamData *e = ((cstreamData *)(stream)); - MUTEX_LOCK(e->stream_lock); - - e->ev.status = C_SYNCHRONIZE; - e->ev.numKernelsToWait = e->kernelQueue->waiting_count; - MUTEX_UNLOCK(e->stream_lock); + printf("No Implement\n"); + exit(1); } cudaError_t cudaGetDeviceCount(int *count) { // dummy value *count = 1; + return cudaSuccess; } cudaError_t cudaGetDeviceProperties(cudaDeviceProp *deviceProp, int device) { diff --git a/runtime/threadPool/CMakeLists.txt b/runtime/threadPool/CMakeLists.txt index d807752..b35445a 100644 --- a/runtime/threadPool/CMakeLists.txt +++ b/runtime/threadPool/CMakeLists.txt @@ -13,6 +13,8 @@ set(CMAKE_CXX_STANDARD 14) set(CMAKE_BUILD_TYPE Debug) include_directories(./include) include_directories(./include/x86) +include_directories(./concurrentqueue) file(GLOB proj_SOURCES "src/x86/*.cpp") add_library(${LIB_NAME} SHARED ${proj_SOURCES}) + #include "blockingconcurrentqueue.h" \ No newline at end of file diff --git a/runtime/threadPool/include/x86/api.h b/runtime/threadPool/include/x86/api.h index 2c2ac92..3250038 100644 --- a/runtime/threadPool/include/x86/api.h +++ b/runtime/threadPool/include/x86/api.h @@ -5,15 +5,8 @@ cu_kernel *create_kernel(const void *func, dim3 gridDim, dim3 blockDim, void **args, size_t sharedMem, cudaStream_t stream); -int getWorkItem(struct kernel_queue **qu, cu_kernel *ker, - struct argument *kernel_arg, int **blockId); int create_KernelQueue(kernel_queue **q); -int dequeKernelLL(struct kernel_queue **qu); - -int dequeKernel(struct kernel_queue **qu, cu_kernel *ker); -int enqueueKernel(struct kernel_queue **qu, cu_kernel **ker); - int scheduler_init(cu_device device); void scheduler_uninit(); void cuSynchronizeBarrier(); diff --git a/runtime/threadPool/include/x86/structures.h b/runtime/threadPool/include/x86/structures.h index b274d63..9362eef 100644 --- a/runtime/threadPool/include/x86/structures.h +++ b/runtime/threadPool/include/x86/structures.h @@ -3,20 +3,57 @@ #include "cuda_runtime.h" #include "pthread.h" +#include "blockingconcurrentqueue.h" -typedef struct device { +typedef struct device +{ int max_compute_units; int device_id; } cu_device; -typedef struct c_thread { +typedef struct c_thread +{ pthread_t thread; unsigned long executed_commands; unsigned index; bool exit; } cu_ptd; -typedef struct scheduler_pool { +// kernel information +typedef struct kernel +{ + + void *(*start_routine)(void *); + + void **args; + + dim3 gridDim; + dim3 blockDim; + + size_t shared_mem; + + cudaStream_t stream; + + struct event *barrier; + + int status; + + int totalBlocks; + + int blockSize; + + int startBlockId; + int endBlockId; + + kernel(const kernel &obj) : start_routine(obj.start_routine), args(obj.args), + shared_mem(obj.shared_mem), blockSize(obj.blockSize), + gridDim(obj.gridDim), blockDim(obj.blockDim), totalBlocks(obj.totalBlocks) {} +} cu_kernel; + +using kernel_queue = moodycamel::BlockingConcurrentQueue; + +typedef struct scheduler_pool +{ struct c_thread *thread_pool; @@ -35,34 +72,12 @@ typedef struct scheduler_pool { // lock for scheduler pthread_mutex_t work_queue_lock; - // C99 array at the end - // user kernel queue for only user called functions - struct kernel_queue *kernelQueue; + kernel_queue *kernelQueue; } cu_pool; -struct kernel_queue { - - struct kernel *head; - struct kernel *tail; - - // finish command count - unsigned long finish_count; - - // waiting to be run on threads - unsigned long waiting_count; - - // running count - unsigned long running_count; - - // total count - unsigned long kernel_count; - - // current index for task to be run - unsigned long current_index; -}; - -typedef struct command { +typedef struct command +{ struct kernel *ker; @@ -71,14 +86,16 @@ typedef struct command { } cu_command; -typedef struct argument { +typedef struct argument +{ // size of the argument to allocation size_t size; void *value; unsigned int index; } cu_argument; -typedef struct input_arg { +typedef struct input_arg +{ // real values for the input char *p; struct argument *argus[]; @@ -87,14 +104,16 @@ typedef struct input_arg { // so that we can parse the arguments p } cu_input; -enum StreamType { +enum StreamType +{ DEFAULT, LOW, HIGH, EXT, }; -struct cStreamDataInternal { +struct cStreamDataInternal +{ /* status of the stream (run , wait) Run: Stream will asynchronously assign the kernel assign with this stream @@ -109,7 +128,8 @@ struct cStreamDataInternal { unsigned int count; // number of task left in the stream }; -typedef struct streamData { +typedef struct streamData +{ // execution status of current event monitor struct cStreamDataInternal ev; @@ -118,46 +138,12 @@ typedef struct streamData { unsigned int id; unsigned int stream_flags; - // queue of the kernels in this stream - struct kernel_queue *kernelQueue; + kernel_queue *kernelQueue; } cstreamData; -// kernel information -typedef struct kernel { - void *(*start_routine)(void *); - - void **args; - - dim3 gridDim; - dim3 blockDim; - - struct kernel *next; - struct kernel *prev; - - size_t shared_mem; - - cudaStream_t stream; - - struct event *barrier; - - int status; - - int totalBlocks; - int N; - - int blockSize; - int kernelId; - - // current blockId - int blockId; - - // execute multiple blocks per fetch - int gpu_block_to_execute_per_cpu_thread; - -} cu_kernel; - -typedef struct asyncKernel { +typedef struct asyncKernel +{ unsigned int numBlocks; unsigned int numThreads; struct event *evt; @@ -170,17 +156,20 @@ typedef struct asyncKernel { // command queue of command nodes -typedef struct kernel_arg_array { +typedef struct kernel_arg_array +{ size_t size; unsigned int index; } karg_arr; -typedef struct kernel_image_arg { +typedef struct kernel_image_arg +{ size_t size; unsigned int index; } k_arg; -typedef struct callParams { +typedef struct callParams +{ dim3 gridDim; dim3 blockDim; size_t shareMem; diff --git a/runtime/threadPool/src/x86/api.cpp b/runtime/threadPool/src/x86/api.cpp index b952dba..be1ab8a 100644 --- a/runtime/threadPool/src/x86/api.cpp +++ b/runtime/threadPool/src/x86/api.cpp @@ -6,6 +6,7 @@ #include #include #include +#include "blockingconcurrentqueue.h" /* @@ -16,15 +17,16 @@ Initialize the device */ int device_max_compute_units = 1; -int init_device() { +int init_device() +{ cu_device *device = (cu_device *)calloc(1, sizeof(cu_device)); if (device == NULL) return C_ERROR_MEMALLOC; device->max_compute_units = std::thread::hardware_concurrency(); + device->max_compute_units = 4; std::cout << device->max_compute_units << " concurrent threads are supported.\n"; - // device->max_compute_units = 64; device_max_compute_units = device->max_compute_units; // initialize scheduler @@ -35,115 +37,30 @@ int init_device() { return C_SUCCESS; } -/* - Create Kernel - -*/ +// Create Kernel static int kernelIds = 0; cu_kernel *create_kernel(const void *func, dim3 gridDim, dim3 blockDim, - void **args, size_t sharedMem, cudaStream_t stream) { + void **args, size_t sharedMem, cudaStream_t stream) +{ cu_kernel *ker = (cu_kernel *)calloc(1, sizeof(cu_kernel)); // set the function pointer ker->start_routine = (void *(*)(void *))func; - ker->args = args; - // exit(1); ker->gridDim = gridDim; ker->blockDim = blockDim; ker->shared_mem = sharedMem; - // std::cout << "stream is null" << std::endl; ker->stream = stream; - // std::cout << "stream is null" << std::endl; - - ker->blockId = 0; ker->totalBlocks = gridDim.x * gridDim.y * gridDim.z; - ker->N = blockDim.x * blockDim.y * blockDim.z; - - ker->kernelId = kernelIds; - kernelIds += 1; - ker->blockSize = blockDim.x * blockDim.y * blockDim.z; - return ker; } -/* - Create Kernel Queue -*/ -int create_KernelQueue(kernel_queue **q) { - *q = (kernel_queue *)calloc(1, sizeof(kernel_queue)); - - if (*q == NULL) { - return C_ERROR_MEMALLOC; - } - - (*q)->kernel_count = 0; - (*q)->running_count = 0; - (*q)->waiting_count = 0; - (*q)->finish_count = 0; - (*q)->current_index = 0; - - return C_SUCCESS; -} - -int dequeKernelLL(struct kernel_queue **qu) { - - struct kernel_queue *q = *qu; - q->finish_count += 1; - - // free the pointer - if (q->head == NULL) { - return C_QUEUE_EMPTY; - } else { - //*ker = *(q->head); - q->head = (q->head)->next; - if (q->head != NULL) { - q->head->prev = NULL; - } - } - - return C_SUCCESS; -} - -int enqueueKernel(struct kernel_queue **qu, cu_kernel **ker) { - struct kernel_queue *q = *qu; - cu_kernel *p = *ker; - // calculate gpu_block_to_execute_per_cpu_thread - p->gpu_block_to_execute_per_cpu_thread = - (p->totalBlocks + device_max_compute_units - 1) / - device_max_compute_units; - printf("total: %d execute per cpu: %d\n", p->totalBlocks, - p->gpu_block_to_execute_per_cpu_thread); - - if (q->head == NULL) { - q->head = p; - q->tail = p; - } else { - p->prev = q->tail; - q->tail->next = p; - q->tail = p; - p->next = NULL; - } - q->kernel_count += 1; - q->waiting_count += 1; - - // float** t1 = (float**)*(q->head->args + 0); - // printf("enqueueKernelTest Args 1: %p \n ", (void *) &t1); - // printf("enqueueKernel Test Args 1: %p \n ", (void *) *(q->head->args + 0)); - // float* t2 = *(t1); - // printf("enqueueKernel G Test Args: %p, val: %f\n ",(void *) &t2, *t2); - - // user kernel command - - return C_SUCCESS; -} - // scheduler static cu_pool *scheduler; @@ -161,187 +78,109 @@ __thread int block_index_z = 0; __thread int thread_memory_size = 0; __thread int *dynamic_shared_memory = NULL; __thread int warp_shfl[32] = { - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, }; /* Enqueue Kernel (k) to the scheduler kernelQueue */ -int schedulerEnqueueKernel(cu_kernel **k) { - cu_kernel *ker = *k; - MUTEX_LOCK(scheduler->work_queue_lock); +int schedulerEnqueueKernel(cu_kernel *k) +{ + int totalBlocks = k->totalBlocks; // calculate gpu_block_to_execute_per_cpu_thread + int gpuBlockToExecutePerCpuThread = + (totalBlocks + device_max_compute_units - 1) / + device_max_compute_units; + for (int startBlockIdx = 0; startBlockIdx < totalBlocks; startBlockIdx += gpuBlockToExecutePerCpuThread) + { + cu_kernel *p = new cu_kernel(*k); + p->startBlockId = startBlockIdx; + p->endBlockId = std::min(startBlockIdx + gpuBlockToExecutePerCpuThread - 1, totalBlocks - 1); + scheduler->kernelQueue->enqueue(p); + } - enqueueKernel(&scheduler->kernelQueue, &ker); - // float** t1 = (float**)*(ker->args + 0); - // printf("scheduler enqueue Test Args 1: %p \n ", (void *) &t1); - // printf("scheduler enqueue Test Args 1: %p \n ", (void *) *(ker->args + 0)); - // float* t2 = *(t1); - // printf("scheduler enqueue G Test Args: %p, val: %f\n ",(void *) &t2, *t2); - - pthread_cond_broadcast(&(scheduler->wake_pool)); - MUTEX_UNLOCK(scheduler->work_queue_lock); - return 0; + printf("total: %d execute per cpu: %d\n", totalBlocks, + gpuBlockToExecutePerCpuThread); + return C_SUCCESS; } /* Kernel Launch with numBlocks and numThreadsPerBlock */ -int cuLaunchKernel(cu_kernel **k) { - if (!scheduler) { +int cuLaunchKernel(cu_kernel **k) +{ + if (!scheduler) + { init_device(); } - std::cout << "launch\n" << std::flush; + std::cout << "launch\n" + << std::flush; // Calculate Block Size N/numBlocks - cu_kernel *ker = *k; int status = C_RUN; - - MUTEX_LOCK(scheduler->work_queue_lock); - scheduler->num_kernel_queued += 1; - MUTEX_UNLOCK(scheduler->work_queue_lock); // stream == 0 add to the kernelQueue - if (ker->stream == 0) { - // float** t1 = (float**)*(ker->args + 0); - // printf("cuLaunchKernel Test Args 1: %p \n ", (void *) &t1); - // printf("cuLaunchKernel Test Args 1: %p \n ", (void *) *(ker->args + 0)); - // float* t2 = *(t1); - // printf("cuLaunchkernel G Test Args: %p, val: %f\n ",(void *) &t2, *t2); - schedulerEnqueueKernel(k); - } else { - // add to it's stream queue - // stream queue can be waiting or running with or without tasks - MUTEX_LOCK(((cstreamData *)(ker->stream))->stream_lock); - status = ((cstreamData *)(ker->stream))->ev.status; - - // if stream queue status is run (first kernel) (enqueue to the kernel - // queue) - cstreamData *e = ((cstreamData *)(ker->stream)); - // synchronized is called after no job in the queue so stream is stuck on - // synchronize - // printf("this way sync\n"); - if (e->ev.status == C_SYNCHRONIZE) { - if ((e->kernelQueue->finish_count) == (e->kernelQueue->kernel_count)) { - e->ev.status = C_RUN; - } - } - - if (e->ev.status == C_RUN) { - // change the status to wait - e->ev.status == C_WAIT; - MUTEX_UNLOCK(((cstreamData *)(ker->stream))->stream_lock); - // printf("this way enqueue\n"); - schedulerEnqueueKernel(&ker); - } else { - // the status of stream queue is wait so just enqueue to the stream - // printf("this way enqwlijs\n"); - enqueueKernel(&((cstreamData *)(ker->stream))->kernelQueue, &ker); - MUTEX_UNLOCK(((cstreamData *)(ker->stream))->stream_lock); - } + if (ker->stream == 0) + { + schedulerEnqueueKernel(ker); + } + else + { + printf("MultiStream no implemente\n"); + exit(1); } return 0; } -/* - Get Work Item: get the kernel from the queue and increment blockId -*/ -int getWorkItem(struct kernel_queue **qu, cu_kernel **kern, int blockId) { - struct kernel_queue *q = *qu; - if (q->waiting_count > 0) { - *kern = q->head; - cu_kernel *ker = *kern; - if (blockId + 1 == q->head->totalBlocks) { - // deque the head - dequeKernelLL(qu); - ker->status = C_COMPLETE; - q->waiting_count -= 1; - } else { - q->head->blockId += 1; - } - q->finish_count += 1; - } else { - return C_QUEUE_EMPTY; - } - return C_SUCCESS; -} - /* Thread Gets Work */ -int get_work(c_thread *th) { - cu_kernel ker; - - // std::cout << "Before Get Work Mutex Queue" << std::endl; - MUTEX_LOCK(scheduler->work_queue_lock); - // std::cout << "After Get Work Mutex Queue" << std::endl; - -RETRY: - - int is_exit = 0; - int is_command_not_null = 0; - - int block_to_execute = 256; - int blockId; - int localBlockSize; - int status; - int completion_status = 0; +int get_work(c_thread *th) +{ int dynamic_shared_mem_size = 0; dim3 gridDim; dim3 blockDim; - - is_exit = scheduler->threadpool_shutdown_requested; - - MUTEX_UNLOCK(scheduler->work_queue_lock); - - if (!is_exit) { - - MUTEX_LOCK(scheduler->work_queue_lock); - - // if kernel waiting to be complete is not zero - if (scheduler->kernelQueue->waiting_count > 0) { - // std::cout << "Waiting Count is greater than 0" << std::endl; - - blockId = scheduler->kernelQueue->head->blockId; - - gridDim = scheduler->kernelQueue->head->gridDim; - blockDim = scheduler->kernelQueue->head->blockDim; - dynamic_shared_mem_size = scheduler->kernelQueue->head->shared_mem; - - // std::cout << "Block ID: " << blockId << std::endl; - localBlockSize = scheduler->kernelQueue->head->blockSize; - // set status as success fully queue - status = C_SUCCESS; - ker = *(scheduler->kernelQueue->head); - - block_to_execute = - scheduler->kernelQueue->head->gpu_block_to_execute_per_cpu_thread; - // if the blockId + 1 is equal to the goal block size , - // then its the last block - if (blockId + block_to_execute >= - scheduler->kernelQueue->head->totalBlocks) { - block_to_execute = scheduler->kernelQueue->head->totalBlocks - blockId; - // deque the head - dequeKernelLL(&scheduler->kernelQueue); - - ker.status = C_COMPLETE; - scheduler->kernelQueue->waiting_count -= 1; - } else { - // increment the blockId - scheduler->kernelQueue->head->blockId = - scheduler->kernelQueue->head->blockId + block_to_execute; - } - // status = getWorkItem(&scheduler->kernelQueue, &ker, blockId); - } else { - status = C_QUEUE_EMPTY; - } - MUTEX_UNLOCK(scheduler->work_queue_lock); - } - - if (status != C_QUEUE_EMPTY) { - // set TLS - for (int s = 0; s < block_to_execute; s++) { - block_index = blockId + s; - block_size = localBlockSize; + while (true) + { + // try to get a task from the queue + cu_kernel *k; + bool getTask = scheduler->kernelQueue->wait_dequeue_timed(k, std::chrono::milliseconds(5)); + if (getTask) + { + // set runtime configuration + gridDim = k->gridDim; + blockDim = k->blockDim; + dynamic_shared_mem_size = k->shared_mem; + block_size = k->blockSize; block_size_x = blockDim.x; block_size_y = blockDim.y; block_size_z = blockDim.z; @@ -350,120 +189,70 @@ RETRY: grid_size_z = gridDim.z; if (dynamic_shared_mem_size > 0) dynamic_shared_memory = (int *)malloc(dynamic_shared_mem_size); - int tmp = block_index; - block_index_x = tmp / (grid_size_y * grid_size_z); - tmp = tmp % (grid_size_y * grid_size_z); - block_index_y = tmp / (grid_size_z); - tmp = tmp % (grid_size_z); - block_index_z = tmp; - ker.start_routine(ker.args); - } - - is_command_not_null = 1; - if (ker.status == C_COMPLETE) { - - // check if this kernel's stream has more jobs to run (enqueue the next - // job) - if (ker.stream != NULL) { - bool synchronize = false; - - MUTEX_LOCK(((cstreamData *)(ker.stream))->stream_lock); - - if (((cstreamData *)(ker.stream))->ev.status == C_SYNCHRONIZE) { - // synchronize stream - if (((cstreamData *)(ker.stream))->ev.numKernelsToWait > 0) { - ((cstreamData *)(ker.stream))->ev.numKernelsToWait -= 1; - } - - if (((cstreamData *)(ker.stream))->ev.status == C_SYNCHRONIZE) { - // synchronize stream - if (((cstreamData *)(ker.stream))->ev.numKernelsToWait > 0) { - ((cstreamData *)(ker.stream))->ev.numKernelsToWait -= 1; - } - - if (((cstreamData *)(ker.stream))->ev.numKernelsToWait == 0) { - synchronize = false; - } else { - synchronize = true; - } - } - if (synchronize == false) { - if (((cstreamData *)(ker.stream))->kernelQueue->waiting_count > 0) { - ((cstreamData *)(ker.stream))->ev.status = C_WAIT; - - cu_kernel *kern = - ((cstreamData *)(ker.stream))->kernelQueue->head; - schedulerEnqueueKernel(&kern); - dequeKernelLL(&((cstreamData *)(ker.stream))->kernelQueue); - - } else { - - // switch the stream to run to allow for the next execution - ((cstreamData *)(ker.stream))->ev.status = C_RUN; - } - } - } - MUTEX_UNLOCK(((cstreamData *)(ker.stream))->stream_lock); + // execute GPU blocks + printf("exec: from: %d to : %d\n",k->startBlockId, k->endBlockId); + for (block_index = k->startBlockId; block_index < k->endBlockId + 1; block_index++) + { + int tmp = block_index; + block_index_x = tmp / (grid_size_y * grid_size_z); + tmp = tmp % (grid_size_y * grid_size_z); + block_index_y = tmp / (grid_size_z); + tmp = tmp % (grid_size_z); + block_index_z = tmp; + k->start_routine(k->args); } - MUTEX_LOCK(scheduler->work_queue_lock); - scheduler->num_kernel_finished += 1; - MUTEX_UNLOCK(scheduler->work_queue_lock); + printf("done: from: %d to : %d\n",k->startBlockId, k->endBlockId); + } + // if cannot get tasks, check whether programs stop + else if (scheduler->threadpool_shutdown_requested) + { + return true; // thread exit } } - - MUTEX_LOCK(scheduler->work_queue_lock); - - if ((is_exit == 0 && is_command_not_null == 0)) { - // all threads in condition wait - scheduler->idle_threads += 1; - if (scheduler->idle_threads == scheduler->num_worker_threads) { - pthread_cond_broadcast(&(scheduler->wake_host)); - } - pthread_cond_wait(&(scheduler->wake_pool), &(scheduler->work_queue_lock)); - scheduler->idle_threads -= 1; - goto RETRY; - } - MUTEX_UNLOCK(scheduler->work_queue_lock); - - return is_exit; + return 0; } -void *driver_thread(void *p) { +void *driver_thread(void *p) +{ struct c_thread *td = (struct c_thread *)p; int is_exit = 0; td->exit = false; - while (1) { - // get work - is_exit = get_work(td); + // get work + printf("before getwork\n"); + is_exit = get_work(td); + printf("after getwork\n"); - // exit the routine - if (is_exit) { - td->exit = true; - // pthread_exit - pthread_exit(NULL); - } + // exit the routine + if (is_exit) + { + td->exit = true; + // pthread_exit + printf("pthread exit\n"); + pthread_exit(NULL); + } + else + { + printf("driver thread stop incorrectly\n"); + exit(1); } } /* Initialize the scheduler */ -int scheduler_init(cu_device device) { +int scheduler_init(cu_device device) +{ scheduler = (cu_pool *)calloc(1, sizeof(cu_pool)); scheduler->num_worker_threads = device.max_compute_units; scheduler->num_kernel_queued = 0; scheduler->thread_pool = (struct c_thread *)calloc( scheduler->num_worker_threads, sizeof(c_thread)); - kernel_queue *asq; - create_KernelQueue(&asq); - scheduler->kernelQueue = asq; + scheduler->kernelQueue = new kernel_queue; - INIT_LOCK(scheduler->work_queue_lock); - pthread_cond_init(&scheduler->wake_pool, NULL); - pthread_cond_init(&scheduler->wake_host, NULL); scheduler->idle_threads = 0; - for (int i = 0; i < scheduler->num_worker_threads; i++) { + for (int i = 0; i < scheduler->num_worker_threads; i++) + { scheduler->thread_pool[i].index = i; pthread_create(&scheduler->thread_pool[i].thread, NULL, driver_thread, (void *)&scheduler->thread_pool[i]); @@ -472,42 +261,10 @@ int scheduler_init(cu_device device) { return C_SUCCESS; } -void scheduler_uninit() { - unsigned i; - - int r = pthread_mutex_lock(&scheduler->work_queue_lock); - assert(r == 0); - scheduler->threadpool_shutdown_requested = 1; - pthread_cond_broadcast(&scheduler->wake_pool); - - int r1 = pthread_mutex_unlock(&scheduler->work_queue_lock); - assert(r1 == 0); - - for (i = 0; i < scheduler->num_worker_threads; i++) { - - pthread_join(scheduler->thread_pool[i].thread, NULL); - } - free(scheduler->thread_pool); - free(scheduler->kernelQueue); - - pthread_mutex_destroy(&scheduler->work_queue_lock); - pthread_cond_destroy(&scheduler->wake_pool); - pthread_cond_destroy(&scheduler->wake_host); - - scheduler->threadpool_shutdown_requested = 0; -} - -int cuWait(cstreamData *evt) { - -AGAIN: - int r = pthread_mutex_lock(&evt->stream_lock); - assert(r == 0); - if (evt->ev.status != C_COMPLETE) { - int r1 = pthread_mutex_unlock(&evt->stream_lock); - assert(r1 == 0); - goto AGAIN; - } - return C_SUCCESS; +void scheduler_uninit() +{ + printf("No Implemente\n"); + exit(1); } /* @@ -522,14 +279,9 @@ AGAIN: Sense Like Barrier Counting Barrier basically */ -void cuSynchronizeBarrier() { - // std::cout << "cuSynchronizeBarrier" << std::endl; - MUTEX_LOCK(scheduler->work_queue_lock); - - if (scheduler->num_kernel_finished != scheduler->num_kernel_queued || - scheduler->idle_threads != scheduler->num_worker_threads) { - // scheduler->idle_threads, scheduler->num_worker_threads); - pthread_cond_wait(&(scheduler->wake_host), &(scheduler->work_queue_lock)); - } - MUTEX_UNLOCK(scheduler->work_queue_lock); +void cuSynchronizeBarrier() +{ + while (scheduler->kernelQueue->size_approx() > 0) + ; + printf("size: %d\n",scheduler->kernelQueue->size_approx()); }