暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

三、任务池是如何实现的

重塑之路 2020-05-22
556

本节是系列课程《利用JNI实现线程池》的第三节,本节将分享任务池的实现原理


完整课程获取方式:关注启明南公众号后回复:jni线程池


文章中贴出的是核心代码,完整代码获取方式:关注本公众号后回复:jni线程池源码


学习本章节需要什么基础


具备第一、第二章节所述的基础即可


线程池理论知识


一个完整的线程池是由线程池、任务池两部分组成。上一章节已经实现线程池的基础功能,本章节接着完成任务池的基础功能。


任务池完整功能:

1、任务池初始化

2、向任务池中添加任务

3、工作线程从任务池中取出任务执行

4、向任务池中加入任务后通知阻塞的工作线程来抢任务执行

5、任务池满后如何处理


本章节完成任务池的前四个功能,第5个功能将在线程池基础功能完成后开发



Java端代码


1、线程池类代码

package com.qimingnan.jni.threadpool.mythreadpool;


public class MyThreadPool {


public native void init(int minSize, int maxSize, int queueMaxSize);


public native void destroy();


public native void addTask(String taskClassName);
}


2、任务类代码

package com.qimingnan.jni.threadpool.mythread;


public class MyThread {


public native void run0();


public void run() {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + " say: " + i);
}
}
}


3、main函数所在类代码

package com.qimingnan.jni.threadpool.mythreadpool;


import com.qimingnan.jni.threadpool.mythread.MyThread;


public class ThreadPoolApplication {


public static MyThreadPool threadPool;


public static void main(String[] args) {
System.loadLibrary("hello");


threadPool = new MyThreadPool();


threadPool.init(1, 5, 4);


/**
* 因为JNI的FindClass接收的参数格式是java/lang/String,JNI转换较麻烦,在Java中转换
*/
threadPool.addTask(MyThread.class.getName().replace('.', '/'));


threadPool.destroy();
}
}


JNI端核心代码


1、任务池数据结构

typedef struct task_pool_t {
/* 互斥锁,用于在多线程环境下操作本结构体保证同步性 */
pthread_mutex_t lock;


/**
* 条件变量,用于
* 1、线程池中的工作线程执行完任务基于这个条件变量阻塞
* 2、向任务池中增加任务后基于这个条件变量唤醒所有工作线程
*/
pthread_cond_t cond_empty;


/* 用于存放任务类字节码的循环队列 */
jclass* task_cqueue;


/* 任务池能存储的最大任务数 */
jint queue_max_size;


/* 任务池中的当前任务数 */
jint queue_size;


/* 当前执行到的任务index */
jint queue_current;
}task_pool, *ptask_pool;


2、任务池初始化代码

jint task_pool_init(JNIEnv *jEnv, jint maxSize) {
g_ptask_pool = calloc(1, sizeof(task_pool));
if (NULL == g_ptask_pool) {
throwRuntimeException(jEnv, "为任务池分配内存失败");


goto return1;
}


g_ptask_pool->task_cqueue = calloc(maxSize, sizeof(task_pool));
if (NULL == g_ptask_pool->task_cqueue) {
throwRuntimeException(jEnv, "为任务队列分配内存失败");


goto return2;
}


if (0 != pthread_mutex_init(&g_ptask_pool->lock, NULL)) {
throwRuntimeException(jEnv, "初始化任务池的锁失败");


goto return3;
}


if (0 != pthread_cond_init(&g_ptask_pool->cond_empty, NULL)) {
throwRuntimeException(jEnv, "初始化任务池的条件变量失败");


goto return4;
}


g_ptask_pool->queue_max_size = maxSize;
g_ptask_pool->queue_size = 0;
g_ptask_pool->queue_current = 0;


return 0;


return4:
pthread_mutex_destroy(&g_ptask_pool->lock);
return3:
free(g_ptask_pool->task_cqueue);
g_ptask_pool->task_cqueue = NULL;
return2:
free(g_ptask_pool);
g_ptask_pool = NULL;
return1:
return -1;
}


3、向任务池中插入任务的函数代码

jint task_pool_add(JNIEnv  *jEnv, jstring taskClassName) {
assert(g_ptask_pool != NULL);
assert(taskClassName != NULL);


JNIEnv env = *jEnv;


jclass taskClass = env->FindClass(jEnv,
env->GetStringUTFChars(jEnv, taskClassName, JNI_FALSE));
if (NULL == taskClass) {
throwRuntimeException(jEnv, "获取任务类字节码失败");


return -1;
}


/* 加锁 */
pthread_mutex_lock(&g_ptask_pool->lock);


if (g_ptask_pool->queue_size == g_ptask_pool->queue_max_size) {
task_pool_print("任务队列已满,无法插入数据,请稍后重试");


goto return1;
}


// queue_current的值只能由消费时的逻辑去操作,增加时不得操作,否则会出现数据混乱
jint index = g_ptask_pool->queue_current % g_ptask_pool->queue_max_size;


g_ptask_pool->task_cqueue[index] = taskClass;


g_ptask_pool->queue_size++;


/* 将加入的任务声明为全局引用,否则线程函数访问不到 */
env->NewGlobalRef(jEnv, g_ptask_pool->task_cqueue[index]);


task_pool_print("成功将任务加入任务池");


/* 唤醒所有等待任务的线程 */
pthread_cond_broadcast(&g_ptask_pool->cond_empty);


/* 释放锁 */
pthread_mutex_unlock(&g_ptask_pool->lock);


return 0;


return1:
pthread_mutex_unlock(&g_ptask_pool->lock);


return -1;
}


4、线程运行函数代码

void* worker_thread_fn(void *arg) {
print_nobuffer("我是工作线程\n");


JNIEnv *jEnv = NULL;


/* 将当前线程attach到jvm中,传出JNIEnv */
(*g_jvm)->AttachCurrentThread(g_jvm, (void **)&jEnv, NULL);


JNIEnv env = *jEnv;


/* 取出任务 */
pthread_mutex_lock(&g_ptask_pool->lock);


while (0 == g_ptask_pool->queue_size) {
task_pool_print("当前无可执行任务,阻塞,等待调度");


pthread_cond_wait(&g_ptask_pool->cond_empty, &g_ptask_pool->lock);
}


task_pool_print("该线程抢到了任务,开始执行");


jclass taskClass = g_ptask_pool->task_cqueue[g_ptask_pool->queue_current];
if (NULL == taskClass) {
throwRuntimeException(jEnv, "当前任务的类字节码不存在");


goto return1;
}


g_ptask_pool->task_cqueue[g_ptask_pool->queue_current] = NULL;


g_ptask_pool->queue_size--;


/* queue_current指向下一个任务类字节码, 求余queue_max_size是为了防止queue_current越界*/
g_ptask_pool->queue_current++;
g_ptask_pool->queue_current %= g_ptask_pool->queue_max_size;


pthread_mutex_unlock(&g_ptask_pool->lock);


/* 运行任务类函数 */
task_run(jEnv, taskClass);


/* 解除attach,否则会出现线程无法退出或不正常退出、jvm会挂等奇葩情况 */
(*g_jvm)->DetachCurrentThread(g_jvm);


return (void *)0;


return1:
/* 接触attach,否则会出现线程无法退出或不正常退出、jvm会挂等奇葩情况 */
(*g_jvm)->DetachCurrentThread(g_jvm);


return (void *)-1;
}


5、执行任务类函数代码

void* task_run(JNIEnv *jEnv, jclass taskClass) {
assert(taskClass != NULL);


JNIEnv env = *jEnv;


/* 获取任务的run方法 */
jmethodID runMethod = (*jEnv)->GetMethodID(jEnv, taskClass, "run", "()V");
if (NULL == runMethod) {
throwRuntimeException(jEnv, "任务类缺少run方法");


goto return1;
}


jobject taskObj = env->AllocObject(jEnv, taskClass);
if (NULL == taskObj) {
throwRuntimeException(jEnv, "实例化任务对象失败");


goto return1;
}


/* 执行运行函数 */
env->CallVoidMethod(jEnv, taskObj, runMethod);


/* 释放任务对象资源 */
env->DeleteLocalRef(jEnv, taskObj);
env->DeleteGlobalRef(jEnv, taskClass);


return1:
return NULL;
}


编译运行


1、编译(MAC环境)

gcc -dynamiclib -I /Library/Java/JavaVirtualMachines/openjdk-12.0.2.jdk/Contents/Home/include/ 
common.c thread_pool.c com_qimingnan_jni_threadpool_mythreadpool_MyThreadPool.c
-o libhello.jnilib


2、运行结果

我是工作线程
256614400: 当前无可执行任务,阻塞,等待调度: current=0, size=0, max_size=4
234848256: 成功将任务加入任务池: current=0, size=1, max_size=4
256614400: 该线程抢到了任务,开始执行: current=0, size=1, max_size=4
Thread-0 say: 0
Thread-0 say: 1
Thread-0 say: 2
Thread-0 say: 3
Thread-0 say: 4
Thread-0 say: 5
Thread-0 say: 6
Thread-0 say: 7
Thread-0 say: 8
Thread-0 say: 9
【线程池销毁程序】任务执行结束,开始释放资源
【销毁任务池】开始释放任务池资源
【销毁线程池】开始释放线程池资源


各位童鞋在阅读源码或实践的过程中有任何疑惑可留言,我会抽时间一一回复

文章转载自重塑之路,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论