Nodejs线程池的设计与实现

本文转载自微信公众号「编程杂技」,线程作者theanarkh。设计实现转载本文请联系编程杂技公众号。线程 

前言:之前的设计实现版本不方便开放,重新设计了一版nodejs的线程线程池库,本文介绍该库的设计实现一些设计和实现。

nodejs虽然提供了线程的线程能力,但是设计实现很多时候,往往不能直接使用线程或者无限制地创建线程,线程比如我们有一个功能是设计实现cpu密集型的,如果一个请求就开一个线程,线程这很明显不是设计实现最好的实践,这时候,线程我们需要使用池化的设计实现技术,本文介绍在nodejs线程模块的线程基础上,如何设计和实现一个线程池库(https://github.com/theanarkh/nodejs-threadpool或npm i nodejs-threadpool )。下面是线程池的总体架构。

设计一个线程池,在真正写代码之前,有很多设计需要考虑,大概如下:

1任务队列的亿华云设计,一个队列,多个线程互斥访问,或者每个线程一个队列,不需要互斥访问。

2 线程退出的设计,可以由主线程检测空闲线程,然后使子线程退出。或者子线程退出,通知主线程。空闲不一定是没有任务就退出,可以设计空闲时间达到阈值后退出,因为创建线程是有时间开销的。

3 任务数的设计,每个线程可以有个任务数,还可以增加一个总任务数,即全部线程任务数加起来

4 选择线程的设计,选择任务数最少的线程。

5 线程类型的设计,可以区分核心线程和预备线程,任务少的时候,核心线程处理就行。任务多也创建预备线程帮忙处理。

6 线程池类型的站群服务器设计,cpu密集型的,线程数等于核数,否则自定义线程数就行。

7 支持任务的取消和超时机制,防止一个任务时间过长或者死循环。

本文介绍的线程池具体设计思想如下(参考java):

1 主线程维护一个队列,子线程的任务由子线程负责分发,不需要互斥访问,子线程也不需要维护自己的队列。

2 线程退出的设计,主线程负责检查子线程空闲时间是否达到阈值,是则使子线程退出。

3 任务数的设计,主线程负责管理任务个数并应有相应的策略。

4 选择线程的设计,选择任务数最少的线程。

5 线程类型的设计,区分核心线程和预备线程,任务少的香港云服务器时候,核心线程处理就行。任务多也创建预备线程帮忙处理。

6 线程池类型的设计,cpu密集型的,线程数等于核数,否则自定义线程数就行。

7 支持任务的取消和超时机制,超时或者取消的时候,主线程判断任务是待执行还是正在执行,如果是待执行则从任务队列中删除,如果是正在执行则杀死对应的子线程。下面我们看一下具体的设计。

1 主线程和子线程通信的数据结构

// 任务类,一个任务对应一个id class Work {      constructor({ workId, filename, options}) {          // 任务id         this.workId = workId;         // 任务逻辑,字符串或者js文件路径         this.filename = filename;         // 任务返回的结果         this.data = null;         // 任务返回的错误         this.error = null;         // 执行任务时传入的参数,用户定义         this.options = options;     } } 

主线程给子线程分派一个任务的时候,就给子线程发送一个Work对象。在nodejs中线程间通信需要经过序列化和反序列化,所以通信的数据结构包括的信息不能过多。

2 子线程处理任务逻辑

const {  parentPort } = require(worker_threads); const vm = require(vm); const {  isFunction, isJSFile } = require(./utils); // 监听主线程提交过来的任务 parentPort.on(message, async (work) => {      try {          const {  filename, options } = work;         let aFunction;         if (isJSFile(filename)) {              aFunction = require(filename);         } else {              aFunction = vm.runInThisContext(`(${ filename})`);         }         if (!isFunction(aFunction)) {              throw new Error(work type error: js file or string);         }         work.data = await aFunction(options);         parentPort.postMessage({ event: done, work});     } catch (error) {          work.error = error.toString();         parentPort.postMessage({ event: error, work});     } }); process.on(uncaughtException, (...rest) => {      console.error(...rest); }); process.on(unhandledRejection, (...rest) => {      console.error(...rest); }); 

子线程的逻辑比较简单,就是监听主线程分派过来的任务,然后执行任务,执行完之后通知主线程。任务支持js文件和字符串代码的形式。需要返回一个Promise或者async函数。用于用于通知主线程任务已经完成。

3 线程池和业务的通信

// 提供给用户侧的接口 class UserWork extends EventEmitter {      constructor({  workId }) {          super();         // 任务id         this.workId = workId;         // 支持超时取消任务         this.timer = null;         // 任务状态         this.state = WORK_STATE.PENDDING;     }     // 超时后取消任务     setTimeout(timeout) {          this.timer = setTimeout(() => {              this.timer && this.cancel() && this.emit(timeout);         }, ~~timeout);     }     // 取消之前设置的定时器     clearTimeout() {          clearTimeout(this.timer);         this.timer = null;     }     // 直接取消任务,如果执行完了就不能取消了,this.terminate是动态设置的     cancel() {          if (this.state === WORK_STATE.END || this.state === WORK_STATE.CANCELED) {             return false;         } else {              this.terminate();             return true;         }     }     // 修改任务状态     setState(state) {          this.state = state;     } } 

业务提交一个任务给线程池的时候,线程池会返回一个UserWork类,业务侧通过UserWork类和线程池通信。

4 管理子线程的数据结构

// 管理子线程的数据结构 class Thread {      constructor({  worker }) {          // nodejs的Worker对象,nodejs的worker_threads模块的Worker         this.worker = worker;         // 线程状态         this.state = THREAD_STATE.IDLE;         // 上次工作的时间         this.lastWorkTime = Date.now();     }     // 修改线程状态     setState(state) {          this.state = state;     }     // 修改线程最后工作时间     setLastWorkTime(time) {          this.lastWorkTime = time;     } } 

线程池中维护了多个子线程,Thread类用于管理子线程的信息。

5 线程池 线程池的实现是核心,我们分为几个部分讲。

5.1 支持的配置

constructor(options = { }) {          this.options = options;         // 子线程队列         this.workerQueue = [];         // 核心线程数         this.coreThreads = ~~options.coreThreads || config.CORE_THREADS;         // 线程池最大线程数,如果不支持动态扩容则最大线程数等于核心线程数         this.maxThreads = options.expansion !== false ? Math.max(this.coreThreads, config.MAX_THREADS) : this.coreThreads;         // 超过任务队列长度时的处理策略         this.discardPolicy = options.discardPolicy ? options.discardPolicy : DISCARD_POLICY.NOT_DISCARD;         // 是否预创建子线程         this.preCreate = options.preCreate === true;         // 线程最大空闲时间,达到后自动退出         this.maxIdleTime = ~~options.maxIdleTime || config.MAX_IDLE_TIME;         // 是否预创建线程池         this.preCreate && this.preCreateThreads();         // 保存线程池中任务对应的UserWork         this.workPool = { };         // 线程池中当前可用的任务id,每次有新任务时自增1         this.workId = 0;         // 线程池中的任务队列         this.queue = [];         // 线程池总任务数         this.totalWork = 0;         // 支持的最大任务数         this.maxWork = ~~options.maxWork || config.MAX_WORK;         // 处理任务的超时时间,全局配置         this.timeout = ~~options.timeout;         this.pollIdle();     } 

上面的代码列出了线程池所支持的能力。

5.2 创建线程

newThread() {          const worker = new Worker(workerPath);         const thread = new Thread({ worker});         this.workerQueue.push(thread);         const threadId = worker.threadId;         worker.on(exit, () => {              // 找到该线程对应的数据结构,然后删除该线程的数据结构             const position = this.workerQueue.findIndex(({ worker}) => {                  return worker.threadId === threadId;             });             const exitedThread = this.workerQueue.splice(position, 1);             // 退出时状态是BUSY说明还在处理任务(非正常退出)             this.totalWork -= exitedThread.state === THREAD_STATE.BUSY ? 1 : 0;         });         // 和子线程通信         worker.on(message, (result) => {              const {                  work,                 event,             } = result;             const {  data, error, workId } = work;             // 通过workId拿到对应的userWork             const userWork = this.workPool[workId];             // 不存在说明任务被取消了             if (!userWork) {                  return;             }             // 修改线程池数据结构             this.endWork(userWork);             // 修改线程数据结构             thread.setLastWorkTime(Date.now());             // 还有任务则通知子线程处理,否则修改子线程状态为空闲             if (this.queue.length) {                  // 从任务队列拿到一个任务交给子线程                 this.submitWorkToThread(thread, this.queue.shift());             } else {                  thread.setState(THREAD_STATE.IDLE);             }             switch(event) {                  case done:                     // 通知用户,任务完成                     userWork.emit(done, data);                     break;                 case error:                     // 通知用户,任务出错                     if (EventEmitter.listenerCount(userWork, error)) {                          userWork.emit(error, error);                     }                     break;                 default: break;             }         });         worker.on(error, (...rest) => {              console.error(...rest);         });         return thread;     } 

创建线程,并保持线程对应的数据结构、退出、通信管理、任务分派。子线程执行完任务后,会通知线程池,主线程通知用户。

5.3 选择线程

selectThead() {          // 找出空闲的线程,把任务交给他         for (let i = 0; i < this.workerQueue.length; i++) {              if (this.workerQueue[i].state === THREAD_STATE.IDLE) {                  return this.workerQueue[i];             }         }         // 没有空闲的则随机选择一个         return this.workerQueue[~~(Math.random() * this.workerQueue.length)];     } 

当用户给线程池提交一个任务时,线程池会选择一个空闲的线程处理该任务。如果没有可用线程则任务插入待处理队列等待处理。

5.4 提交任务

// 给线程池提交一个任务     submit(filename, options = { }) {          return new Promise(async (resolve, reject) => {              let thread;             // 没有线程则创建一个             if (this.workerQueue.length) {                  thread = this.selectThead();                 // 该线程还有任务需要处理                 if (thread.state === THREAD_STATE.BUSY) {                      // 子线程个数还没有达到核心线程数,则新建线程处理                     if (this.workerQueue.length < this.coreThreads) {                          thread = this.newThread();                     } else if (this.totalWork + 1 > this.maxWork){                          // 总任务数已达到阈值,还没有达到线程数阈值,则创建                         if(this.workerQueue.length < this.maxThreads) {                              thread = this.newThread();                         } else {                              // 处理溢出的任务                             switch(this.discardPolicy) {                                  case DISCARD_POLICY.ABORT:                                      return reject(new Error(queue overflow));                                 case DISCARD_POLICY.CALLER_RUN:                                     const workId = this.generateWorkId();                                     const userWork =  new UserWork({ workId});                                      userWork.setState(WORK_STATE.RUNNING);                                     userWork.terminate = () => {                                          userWork.setState(WORK_STATE.CANCELED);                                     };                                     this.timeout && userWork.setTimeout(this.timeout);                                     resolve(userWork);                                     try {                                          let aFunction;                                         if (isJSFile(filename)) {                                              aFunction = require(filename);                                         } else {                                              aFunction = vm.runInThisContext(`(${ filename})`);                                         }                                         if (!isFunction(aFunction)) {                                              throw new Error(work type error: js file or string);                                         }                                         const result = await aFunction(options);                                         // 延迟通知,让用户有机会取消或者注册事件                                         setImmediate(() => {                                              if (userWork.state !== WORK_STATE.CANCELED) {                                                  userWork.setState(WORK_STATE.END);                                                 userWork.emit(done, result);                                             }                                         });                                     } catch (error) {                                          setImmediate(() => {                                              if (userWork.state !== WORK_STATE.CANCELED) {                                                  userWork.setState(WORK_STATE.END);                                                 userWork.emit(error, error.toString());                                             }                                         });                                     }                                     return;                                 case DISCARD_POLICY.OLDEST_DISCARD:                                      const work = this.queue.shift();                                     // maxWork为1时,work会为空                                     if (work && this.workPool[work.workId]) {                                          this.cancelWork(this.workPool[work.workId]);                                     } else {                                          return reject(new Error(no work can be discarded));                                     }                                     break;                                 case DISCARD_POLICY.DISCARD:                                     return reject(new Error(discard));                                 case DISCARD_POLICY.NOT_DISCARD:                                     break;                                 default:                                      break;                             }                         }                     }                 }             } else {                  thread = this.newThread();             }             // 生成一个任务id             const workId = this.generateWorkId();             // 新建一个UserWork             const userWork =  new UserWork({ workId});              this.timeout && userWork.setTimeout(this.timeout);             // 新建一个work             const work = new Work({  workId, filename, options });             // 修改线程池数据结构,把UserWork和Work关联起来             this.addWork(userWork);             // 选中的线程正在处理任务,则先缓存到任务队列             if (thread.state === THREAD_STATE.BUSY) {                  this.queue.push(work);                 userWork.terminate = () => {                      this.cancelWork(userWork);                     this.queue = this.queue.filter((node) => {                          return node.workId !== work.workId;                     });                 }             } else {                  this.submitWorkToThread(thread, work);             }             resolve(userWork);         })     }     submitWorkToThread(thread, work) {          const userWork = this.workPool[work.workId];         userWork.setState(WORK_STATE.RUNNING);         // 否则交给线程处理,并修改状态和记录该线程当前处理的任务id         thread.setState(THREAD_STATE.BUSY);         thread.worker.postMessage(work);         userWork.terminate = () => {              this.cancelWork(userWork);             thread.setState(THREAD_STATE.DEAD);             thread.worker.terminate();         }     }     addWork(userWork) {          userWork.setState(WORK_STATE.PENDDING);         this.workPool[userWork.workId] = userWork;         this.totalWork++;     }     endWork(userWork) {          delete this.workPool[userWork.workId];         this.totalWork--;         userWork.setState(WORK_STATE.END);         userWork.clearTimeout();      }     cancelWork(userWork) {          delete this.workPool[userWork.workId];         this.totalWork--;         userWork.setState(WORK_STATE.CANCELED);         userWork.emit(cancel);     } 

提交任务是线程池暴露给用户侧的接口,主要处理的逻辑包括,根据当前的策略判断是否需要新建线程、选择线程处理任务、排队任务等,如果任务数达到阈值,则根据丢弃策略处理该任务。

5.5 空闲处理

pollIdle() {          setTimeout(() => {              for (let i = 0; i < this.workerQueue.length; i++) {                  const node = this.workerQueue[i];                 if (node.state === THREAD_STATE.IDLE && Date.now() - node.lastWorkTime > this.maxIdleTime) {                      node.worker.terminate();                 }             }             this.pollIdle();         }, 1000);     } 

当子线程空闲时间达到阈值后,主线程会杀死子线程,避免浪费系统资源。总结,这就是线程池具体的设计和实现,另外创建线程失败会导致主线程挂掉,所以使用线程的时候,最后新开一个子进程来管理该线程池。

滇ICP备2023000592号-31