本节是系列课程《利用JNI实现线程池》的第三节,本节将分享任务池的实现原理
完整课程获取方式:关注启明南公众号后回复:jni线程池
文章中贴出的是核心代码,完整代码获取方式:关注本公众号后回复:jni线程池源码
学习本章节需要什么基础
具备第一、第二章节所述的基础即可
线程池理论知识
一个完整的线程池是由线程池、任务池两部分组成。上一章节已经实现线程池的基础功能,本章节接着完成任务池的基础功能。
任务池完整功能:
1、任务池初始化
2、向任务池中添加任务
3、工作线程从任务池中取出任务执行
4、向任务池中加入任务后通知阻塞的工作线程来抢任务执行
5、任务池满后如何处理
本章节完成任务池的前四个功能,第5个功能将在线程池基础功能完成后开发

Java端代码
1、线程池类代码
package com.qimingnan.jni.threadpool.mythreadpool;public class MyThreadPool {public native void init(int minSize, int maxSize, int queueMaxSize);public native void destroy();public native void addTask(String taskClassName);}
2、任务类代码
package com.qimingnan.jni.threadpool.mythread;public class MyThread {public native void run0();public void run() {for (int i = 0; i < 10; i++) {System.out.println(Thread.currentThread().getName() + " say: " + i);}}}
3、main函数所在类代码
package com.qimingnan.jni.threadpool.mythreadpool;import com.qimingnan.jni.threadpool.mythread.MyThread;public class ThreadPoolApplication {public static MyThreadPool threadPool;public static void main(String[] args) {System.loadLibrary("hello");threadPool = new MyThreadPool();threadPool.init(1, 5, 4);/*** 因为JNI的FindClass接收的参数格式是java/lang/String,JNI转换较麻烦,在Java中转换*/threadPool.addTask(MyThread.class.getName().replace('.', '/'));threadPool.destroy();}}
JNI端核心代码
1、任务池数据结构
typedef struct task_pool_t {/* 互斥锁,用于在多线程环境下操作本结构体保证同步性 */pthread_mutex_t lock;/*** 条件变量,用于* 1、线程池中的工作线程执行完任务基于这个条件变量阻塞* 2、向任务池中增加任务后基于这个条件变量唤醒所有工作线程*/pthread_cond_t cond_empty;/* 用于存放任务类字节码的循环队列 */jclass* task_cqueue;/* 任务池能存储的最大任务数 */jint queue_max_size;/* 任务池中的当前任务数 */jint queue_size;/* 当前执行到的任务index */jint queue_current;}task_pool, *ptask_pool;
2、任务池初始化代码
jint task_pool_init(JNIEnv *jEnv, jint maxSize) {g_ptask_pool = calloc(1, sizeof(task_pool));if (NULL == g_ptask_pool) {throwRuntimeException(jEnv, "为任务池分配内存失败");goto return1;}g_ptask_pool->task_cqueue = calloc(maxSize, sizeof(task_pool));if (NULL == g_ptask_pool->task_cqueue) {throwRuntimeException(jEnv, "为任务队列分配内存失败");goto return2;}if (0 != pthread_mutex_init(&g_ptask_pool->lock, NULL)) {throwRuntimeException(jEnv, "初始化任务池的锁失败");goto return3;}if (0 != pthread_cond_init(&g_ptask_pool->cond_empty, NULL)) {throwRuntimeException(jEnv, "初始化任务池的条件变量失败");goto return4;}g_ptask_pool->queue_max_size = maxSize;g_ptask_pool->queue_size = 0;g_ptask_pool->queue_current = 0;return 0;return4:pthread_mutex_destroy(&g_ptask_pool->lock);return3:free(g_ptask_pool->task_cqueue);g_ptask_pool->task_cqueue = NULL;return2:free(g_ptask_pool);g_ptask_pool = NULL;return1:return -1;}
3、向任务池中插入任务的函数代码
jint task_pool_add(JNIEnv *jEnv, jstring taskClassName) {assert(g_ptask_pool != NULL);assert(taskClassName != NULL);JNIEnv env = *jEnv;jclass taskClass = env->FindClass(jEnv,env->GetStringUTFChars(jEnv, taskClassName, JNI_FALSE));if (NULL == taskClass) {throwRuntimeException(jEnv, "获取任务类字节码失败");return -1;}/* 加锁 */pthread_mutex_lock(&g_ptask_pool->lock);if (g_ptask_pool->queue_size == g_ptask_pool->queue_max_size) {task_pool_print("任务队列已满,无法插入数据,请稍后重试");goto return1;}// queue_current的值只能由消费时的逻辑去操作,增加时不得操作,否则会出现数据混乱jint index = g_ptask_pool->queue_current % g_ptask_pool->queue_max_size;g_ptask_pool->task_cqueue[index] = taskClass;g_ptask_pool->queue_size++;/* 将加入的任务声明为全局引用,否则线程函数访问不到 */env->NewGlobalRef(jEnv, g_ptask_pool->task_cqueue[index]);task_pool_print("成功将任务加入任务池");/* 唤醒所有等待任务的线程 */pthread_cond_broadcast(&g_ptask_pool->cond_empty);/* 释放锁 */pthread_mutex_unlock(&g_ptask_pool->lock);return 0;return1:pthread_mutex_unlock(&g_ptask_pool->lock);return -1;}
4、线程运行函数代码
void* worker_thread_fn(void *arg) {print_nobuffer("我是工作线程\n");JNIEnv *jEnv = NULL;/* 将当前线程attach到jvm中,传出JNIEnv */(*g_jvm)->AttachCurrentThread(g_jvm, (void **)&jEnv, NULL);JNIEnv env = *jEnv;/* 取出任务 */pthread_mutex_lock(&g_ptask_pool->lock);while (0 == g_ptask_pool->queue_size) {task_pool_print("当前无可执行任务,阻塞,等待调度");pthread_cond_wait(&g_ptask_pool->cond_empty, &g_ptask_pool->lock);}task_pool_print("该线程抢到了任务,开始执行");jclass taskClass = g_ptask_pool->task_cqueue[g_ptask_pool->queue_current];if (NULL == taskClass) {throwRuntimeException(jEnv, "当前任务的类字节码不存在");goto return1;}g_ptask_pool->task_cqueue[g_ptask_pool->queue_current] = NULL;g_ptask_pool->queue_size--;/* queue_current指向下一个任务类字节码, 求余queue_max_size是为了防止queue_current越界*/g_ptask_pool->queue_current++;g_ptask_pool->queue_current %= g_ptask_pool->queue_max_size;pthread_mutex_unlock(&g_ptask_pool->lock);/* 运行任务类函数 */task_run(jEnv, taskClass);/* 解除attach,否则会出现线程无法退出或不正常退出、jvm会挂等奇葩情况 */(*g_jvm)->DetachCurrentThread(g_jvm);return (void *)0;return1:/* 接触attach,否则会出现线程无法退出或不正常退出、jvm会挂等奇葩情况 */(*g_jvm)->DetachCurrentThread(g_jvm);return (void *)-1;}
5、执行任务类函数代码
void* task_run(JNIEnv *jEnv, jclass taskClass) {assert(taskClass != NULL);JNIEnv env = *jEnv;/* 获取任务的run方法 */jmethodID runMethod = (*jEnv)->GetMethodID(jEnv, taskClass, "run", "()V");if (NULL == runMethod) {throwRuntimeException(jEnv, "任务类缺少run方法");goto return1;}jobject taskObj = env->AllocObject(jEnv, taskClass);if (NULL == taskObj) {throwRuntimeException(jEnv, "实例化任务对象失败");goto return1;}/* 执行运行函数 */env->CallVoidMethod(jEnv, taskObj, runMethod);/* 释放任务对象资源 */env->DeleteLocalRef(jEnv, taskObj);env->DeleteGlobalRef(jEnv, taskClass);return1:return NULL;}
编译运行
1、编译(MAC环境)
gcc -dynamiclib -I /Library/Java/JavaVirtualMachines/openjdk-12.0.2.jdk/Contents/Home/include/common.c thread_pool.c com_qimingnan_jni_threadpool_mythreadpool_MyThreadPool.c-o libhello.jnilib
2、运行结果
我是工作线程256614400: 当前无可执行任务,阻塞,等待调度: current=0, size=0, max_size=4234848256: 成功将任务加入任务池: current=0, size=1, max_size=4256614400: 该线程抢到了任务,开始执行: current=0, size=1, max_size=4Thread-0 say: 0Thread-0 say: 1Thread-0 say: 2Thread-0 say: 3Thread-0 say: 4Thread-0 say: 5Thread-0 say: 6Thread-0 say: 7Thread-0 say: 8Thread-0 say: 9【线程池销毁程序】任务执行结束,开始释放资源【销毁任务池】开始释放任务池资源【销毁线程池】开始释放线程池资源
各位童鞋在阅读源码或实践的过程中有任何疑惑可留言,我会抽时间一一回复
文章转载自重塑之路,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




