fix bug in CI

This commit is contained in:
Ruobing Han 2022-06-20 23:03:01 -04:00
parent 2618bd21a7
commit c1045d8140
4 changed files with 59 additions and 117 deletions

View File

@ -37,7 +37,7 @@ jobs:
run: | run: |
cd ${{ github.workspace }} cd ${{ github.workspace }}
cd runtime/threadPool cd runtime/threadPool
git clone git@github.com:cameron314/concurrentqueue.git git clone https://github.com/cameron314/concurrentqueue.git
- name: Build project - name: Build project
run: | run: |
mkdir build mkdir build

View File

@ -17,4 +17,4 @@ include_directories(./concurrentqueue)
file(GLOB proj_SOURCES "src/x86/*.cpp") file(GLOB proj_SOURCES "src/x86/*.cpp")
add_library(${LIB_NAME} SHARED ${proj_SOURCES}) add_library(${LIB_NAME} SHARED ${proj_SOURCES})
#include "blockingconcurrentqueue.h" # include "blockingconcurrentqueue.h"

View File

@ -1,18 +1,16 @@
#ifndef C_STRUCTURES_H #ifndef C_STRUCTURES_H
#define C_STRUCTURES_H #define C_STRUCTURES_H
#include "blockingconcurrentqueue.h"
#include "cuda_runtime.h" #include "cuda_runtime.h"
#include "pthread.h" #include "pthread.h"
#include "blockingconcurrentqueue.h"
typedef struct device typedef struct device {
{
int max_compute_units; int max_compute_units;
int device_id; int device_id;
} cu_device; } cu_device;
typedef struct c_thread typedef struct c_thread {
{
pthread_t thread; pthread_t thread;
unsigned long executed_commands; unsigned long executed_commands;
unsigned index; unsigned index;
@ -21,8 +19,7 @@ typedef struct c_thread
} cu_ptd; } cu_ptd;
// kernel information // kernel information
typedef struct kernel typedef struct kernel {
{
void *(*start_routine)(void *); void *(*start_routine)(void *);
@ -46,15 +43,16 @@ typedef struct kernel
int startBlockId; int startBlockId;
int endBlockId; int endBlockId;
kernel(const kernel &obj) : start_routine(obj.start_routine), args(obj.args), kernel(const kernel &obj)
shared_mem(obj.shared_mem), blockSize(obj.blockSize), : start_routine(obj.start_routine), args(obj.args),
gridDim(obj.gridDim), blockDim(obj.blockDim), totalBlocks(obj.totalBlocks) {} shared_mem(obj.shared_mem), blockSize(obj.blockSize),
gridDim(obj.gridDim), blockDim(obj.blockDim),
totalBlocks(obj.totalBlocks) {}
} cu_kernel; } cu_kernel;
using kernel_queue = moodycamel::BlockingConcurrentQueue<kernel *>; using kernel_queue = moodycamel::BlockingConcurrentQueue<kernel *>;
typedef struct scheduler_pool typedef struct scheduler_pool {
{
struct c_thread *thread_pool; struct c_thread *thread_pool;
@ -77,8 +75,7 @@ typedef struct scheduler_pool
} cu_pool; } cu_pool;
typedef struct command typedef struct command {
{
struct kernel *ker; struct kernel *ker;
@ -87,16 +84,14 @@ typedef struct command
} cu_command; } cu_command;
typedef struct argument typedef struct argument {
{
// size of the argument to allocation // size of the argument to allocation
size_t size; size_t size;
void *value; void *value;
unsigned int index; unsigned int index;
} cu_argument; } cu_argument;
typedef struct input_arg typedef struct input_arg {
{
// real values for the input // real values for the input
char *p; char *p;
struct argument *argus[]; struct argument *argus[];
@ -105,16 +100,14 @@ typedef struct input_arg
// so that we can parse the arguments p // so that we can parse the arguments p
} cu_input; } cu_input;
enum StreamType enum StreamType {
{
DEFAULT, DEFAULT,
LOW, LOW,
HIGH, HIGH,
EXT, EXT,
}; };
struct cStreamDataInternal struct cStreamDataInternal {
{
/* /*
status of the stream (run , wait) status of the stream (run , wait)
Run: Stream will asynchronously assign the kernel assign with this stream Run: Stream will asynchronously assign the kernel assign with this stream
@ -129,8 +122,7 @@ struct cStreamDataInternal
unsigned int count; // number of task left in the stream unsigned int count; // number of task left in the stream
}; };
typedef struct streamData typedef struct streamData {
{
// execution status of current event monitor // execution status of current event monitor
struct cStreamDataInternal ev; struct cStreamDataInternal ev;
@ -143,8 +135,7 @@ typedef struct streamData
} cstreamData; } cstreamData;
typedef struct asyncKernel typedef struct asyncKernel {
{
unsigned int numBlocks; unsigned int numBlocks;
unsigned int numThreads; unsigned int numThreads;
struct event *evt; struct event *evt;
@ -157,20 +148,17 @@ typedef struct asyncKernel
// command queue of command nodes // command queue of command nodes
typedef struct kernel_arg_array typedef struct kernel_arg_array {
{
size_t size; size_t size;
unsigned int index; unsigned int index;
} karg_arr; } karg_arr;
typedef struct kernel_image_arg typedef struct kernel_image_arg {
{
size_t size; size_t size;
unsigned int index; unsigned int index;
} k_arg; } k_arg;
typedef struct callParams typedef struct callParams {
{
dim3 gridDim; dim3 gridDim;
dim3 blockDim; dim3 blockDim;
size_t shareMem; size_t shareMem;

View File

@ -1,4 +1,5 @@
#include "api.h" #include "api.h"
#include "blockingconcurrentqueue.h"
#include "def.h" #include "def.h"
#include "macros.h" #include "macros.h"
#include "structures.h" #include "structures.h"
@ -6,7 +7,6 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <thread> #include <thread>
#include "blockingconcurrentqueue.h"
/* /*
@ -17,8 +17,7 @@
Initialize the device Initialize the device
*/ */
int device_max_compute_units = 1; int device_max_compute_units = 1;
int init_device() int init_device() {
{
cu_device *device = (cu_device *)calloc(1, sizeof(cu_device)); cu_device *device = (cu_device *)calloc(1, sizeof(cu_device));
if (device == NULL) if (device == NULL)
return C_ERROR_MEMALLOC; return C_ERROR_MEMALLOC;
@ -39,8 +38,7 @@ int init_device()
// Create Kernel // Create Kernel
static int kernelIds = 0; static int kernelIds = 0;
cu_kernel *create_kernel(const void *func, dim3 gridDim, dim3 blockDim, cu_kernel *create_kernel(const void *func, dim3 gridDim, dim3 blockDim,
void **args, size_t sharedMem, cudaStream_t stream) void **args, size_t sharedMem, cudaStream_t stream) {
{
cu_kernel *ker = (cu_kernel *)calloc(1, sizeof(cu_kernel)); cu_kernel *ker = (cu_kernel *)calloc(1, sizeof(cu_kernel));
// set the function pointer // set the function pointer
@ -77,54 +75,24 @@ __thread int block_index_z = 0;
__thread int thread_memory_size = 0; __thread int thread_memory_size = 0;
__thread int *dynamic_shared_memory = NULL; __thread int *dynamic_shared_memory = NULL;
__thread int warp_shfl[32] = { __thread int warp_shfl[32] = {
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
}; };
/* /*
Enqueue Kernel (k) to the scheduler kernelQueue Enqueue Kernel (k) to the scheduler kernelQueue
*/ */
int schedulerEnqueueKernel(cu_kernel *k) int schedulerEnqueueKernel(cu_kernel *k) {
{ int totalBlocks =
int totalBlocks = k->totalBlocks; // calculate gpu_block_to_execute_per_cpu_thread k->totalBlocks; // calculate gpu_block_to_execute_per_cpu_thread
int gpuBlockToExecutePerCpuThread = int gpuBlockToExecutePerCpuThread =
(totalBlocks + device_max_compute_units - 1) / (totalBlocks + device_max_compute_units - 1) / device_max_compute_units;
device_max_compute_units; for (int startBlockIdx = 0; startBlockIdx < totalBlocks;
for (int startBlockIdx = 0; startBlockIdx < totalBlocks; startBlockIdx += gpuBlockToExecutePerCpuThread) startBlockIdx += gpuBlockToExecutePerCpuThread) {
{
cu_kernel *p = new cu_kernel(*k); cu_kernel *p = new cu_kernel(*k);
p->startBlockId = startBlockIdx; p->startBlockId = startBlockIdx;
p->endBlockId = std::min(startBlockIdx + gpuBlockToExecutePerCpuThread - 1, totalBlocks - 1); p->endBlockId = std::min(startBlockIdx + gpuBlockToExecutePerCpuThread - 1,
totalBlocks - 1);
scheduler->kernelQueue->enqueue(p); scheduler->kernelQueue->enqueue(p);
} }
@ -136,18 +104,14 @@ int schedulerEnqueueKernel(cu_kernel *k)
/* /*
Kernel Launch with numBlocks and numThreadsPerBlock Kernel Launch with numBlocks and numThreadsPerBlock
*/ */
int cuLaunchKernel(cu_kernel **k) int cuLaunchKernel(cu_kernel **k) {
{
// Calculate Block Size N/numBlocks // Calculate Block Size N/numBlocks
cu_kernel *ker = *k; cu_kernel *ker = *k;
int status = C_RUN; int status = C_RUN;
// stream == 0 add to the kernelQueue // stream == 0 add to the kernelQueue
if (ker->stream == 0) if (ker->stream == 0) {
{
schedulerEnqueueKernel(ker); schedulerEnqueueKernel(ker);
} } else {
else
{
printf("MultiStream no implemente\n"); printf("MultiStream no implemente\n");
exit(1); exit(1);
} }
@ -157,19 +121,17 @@ int cuLaunchKernel(cu_kernel **k)
/* /*
Thread Gets Work Thread Gets Work
*/ */
int get_work(c_thread *th) int get_work(c_thread *th) {
{
int dynamic_shared_mem_size = 0; int dynamic_shared_mem_size = 0;
dim3 gridDim; dim3 gridDim;
dim3 blockDim; dim3 blockDim;
while (true) while (true) {
{
// try to get a task from the queue // try to get a task from the queue
cu_kernel *k; cu_kernel *k;
th->busy = false; th->busy = false;
bool getTask = scheduler->kernelQueue->wait_dequeue_timed(k, std::chrono::milliseconds(5)); bool getTask = scheduler->kernelQueue->wait_dequeue_timed(
if (getTask) k, std::chrono::milliseconds(5));
{ if (getTask) {
th->busy = true; th->busy = true;
// set runtime configuration // set runtime configuration
gridDim = k->gridDim; gridDim = k->gridDim;
@ -185,8 +147,8 @@ int get_work(c_thread *th)
if (dynamic_shared_mem_size > 0) if (dynamic_shared_mem_size > 0)
dynamic_shared_memory = (int *)malloc(dynamic_shared_mem_size); dynamic_shared_memory = (int *)malloc(dynamic_shared_mem_size);
// execute GPU blocks // execute GPU blocks
for (block_index = k->startBlockId; block_index < k->endBlockId + 1; block_index++) for (block_index = k->startBlockId; block_index < k->endBlockId + 1;
{ block_index++) {
int tmp = block_index; int tmp = block_index;
block_index_x = tmp / (grid_size_y * grid_size_z); block_index_x = tmp / (grid_size_y * grid_size_z);
tmp = tmp % (grid_size_y * grid_size_z); tmp = tmp % (grid_size_y * grid_size_z);
@ -197,8 +159,7 @@ int get_work(c_thread *th)
} }
} }
// if cannot get tasks, check whether programs stop // if cannot get tasks, check whether programs stop
else if (scheduler->threadpool_shutdown_requested) else if (scheduler->threadpool_shutdown_requested) {
{
th->busy = false; th->busy = false;
return true; // thread exit return true; // thread exit
} }
@ -206,8 +167,7 @@ int get_work(c_thread *th)
return 0; return 0;
} }
void *driver_thread(void *p) void *driver_thread(void *p) {
{
struct c_thread *td = (struct c_thread *)p; struct c_thread *td = (struct c_thread *)p;
int is_exit = 0; int is_exit = 0;
td->exit = false; td->exit = false;
@ -216,14 +176,11 @@ void *driver_thread(void *p)
is_exit = get_work(td); is_exit = get_work(td);
// exit the routine // exit the routine
if (is_exit) if (is_exit) {
{
td->exit = true; td->exit = true;
// pthread_exit // pthread_exit
pthread_exit(NULL); pthread_exit(NULL);
} } else {
else
{
printf("driver thread stop incorrectly\n"); printf("driver thread stop incorrectly\n");
exit(1); exit(1);
} }
@ -232,8 +189,7 @@ void *driver_thread(void *p)
/* /*
Initialize the scheduler Initialize the scheduler
*/ */
int scheduler_init(cu_device device) int scheduler_init(cu_device device) {
{
scheduler = (cu_pool *)calloc(1, sizeof(cu_pool)); scheduler = (cu_pool *)calloc(1, sizeof(cu_pool));
scheduler->num_worker_threads = device.max_compute_units; scheduler->num_worker_threads = device.max_compute_units;
scheduler->num_kernel_queued = 0; scheduler->num_kernel_queued = 0;
@ -243,8 +199,7 @@ int scheduler_init(cu_device device)
scheduler->kernelQueue = new kernel_queue; scheduler->kernelQueue = new kernel_queue;
scheduler->idle_threads = 0; scheduler->idle_threads = 0;
for (int i = 0; i < scheduler->num_worker_threads; i++) for (int i = 0; i < scheduler->num_worker_threads; i++) {
{
scheduler->thread_pool[i].index = i; scheduler->thread_pool[i].index = i;
pthread_create(&scheduler->thread_pool[i].thread, NULL, driver_thread, pthread_create(&scheduler->thread_pool[i].thread, NULL, driver_thread,
(void *)&scheduler->thread_pool[i]); (void *)&scheduler->thread_pool[i]);
@ -253,8 +208,7 @@ int scheduler_init(cu_device device)
return C_SUCCESS; return C_SUCCESS;
} }
void scheduler_uninit() void scheduler_uninit() {
{
printf("Scheduler Unitit no Implemente\n"); printf("Scheduler Unitit no Implemente\n");
exit(1); exit(1);
} }
@ -271,17 +225,17 @@ void scheduler_uninit()
Sense Like Barrier Sense Like Barrier
Counting Barrier basically Counting Barrier basically
*/ */
void cuSynchronizeBarrier() void cuSynchronizeBarrier() {
{ while (1) {
while(1) { // sync is complete, only if queue size == 0 and none of
// sync is complete, only if queue size == 0 and none of
// driver threads are busy // driver threads are busy
if(scheduler->kernelQueue->size_approx() == 0) { if (scheduler->kernelQueue->size_approx() == 0) {
bool none_busy = true; bool none_busy = true;
for (int i = 0; i < scheduler->num_worker_threads; i++) { for (int i = 0; i < scheduler->num_worker_threads; i++) {
none_busy&=(!scheduler->thread_pool[i].busy); none_busy &= (!scheduler->thread_pool[i].busy);
} }
if(none_busy) break; if (none_busy)
break;
} }
} }
} }