Node.js 进程与线程
一.进程(Process)
进程是:资源分配和调度的基本单元
.启动一个服务,运行一个实例,就是开一个服务进程,例如 nodejs 里面通过 node app.js 开启一个服务进程. 多进程就是进程的复制(fork)
,fork 出来的每个进程都拥有自己的独立空间地址,数据栈,一个进程无法访问另外一个进程里定义的变量,数据结构,只有建立了 IPC 通信
,进程之间才可以数据共享.
const http = require('http')
const server = http.createServer()
server.listen(3001, () => {
process.title = 'windego-node 测试进程'
console.log('进程id', process.pid)
})
运行上面代码后,以下为 Mac 系统自带的监控工具 “活动监视器” 所展示的效果,可以看到我们刚开启的 Nodejs 进程 27167
二.线程(thread)
- 线程(thread) 是操作系统能够
运行运算调度的最小单位
.线程是隶属于进程的,被包含在进程中.一个线程只能属于一个进程,但是一个进程可以有多个线程
. - 同一进程中的多条线程将共享该进程中的全部系统资源,如虚拟地址空间,文件描述符和信号处理等等.
- 但同一进程中的多个线程有各自的调用栈(call stack),自己的寄存器环境(register context),自己的线程本地存储(thread-local storage)。一个进程可以有很多线程,每条线程并行执行不同的任务
三.单线程
单线程就是一个进程只开一个线程
Javascript 就是属于单线程,程序顺序执行(这里暂且不提 JS 异步),可以想象一下队列,前面一个执行完之后,后面才可以执行,当你在使用单线程语言编码时切勿有过多耗时的同步操作,否则线程会造成阻塞,导致后续响应无法处理。你如果采用 Javascript 进行编码时候,请尽可能的利用 Javascript 异步操作的特性。
const http = require('http')
const longComputation = () => {
let sum = 0
for (let i = 0; i < 1e10; i++) {
sum += i
}
return sum
}
const server = http.createServer()
server.on('request', (req, res) => {
if (req.url === '/compute') {
console.info('计算开始', new Date())
const sum = longComputation()
console.info('计算结束', new Date())
return res.end(`Sum is ${sum}`)
} else {
res.end('Ok')
}
})
server.listen(2020)
//打印结果
// 计算开始 2020-06-24T03:46:16.909Z
// 计算结束 2020-06-24T03:46:28.905Z
查看打印结果,当我们调用 127.0.0.1:2020/compute 的时候,如果想要调用其他的路由地址比如 127.0.0.1/大约需要 12 秒时间,也可以说一个用户请求完第一个 compute 接口后需要等待 15 秒,这对于用户来说是极其不友好的。下面创建多进程的方式 child_process.fork 和 cluster 来解决解决这个问题。
单线程的一些说明
Node.js 虽然是单线程模型,但是其基于事件驱动、异步非阻塞模式,可以应用于高并发场景,避免了线程创建、线程之间上下文切换所产生的资源开销。
当你的项目中需要有大量计算,CPU 耗时的操作时候,要注意考虑开启多进程来完成了。
Node.js 开发过程中,错误会引起整个应用退出,应用的健壮性值得考验,尤其是错误的异常抛出,以及进程守护是必须要做的。
单线程无法利用多核 CPU,但是后来 Node.js 提供的 API 以及一些第三方工具相应都得到了解决,
四.Node.js 中的进程与线程
Node.js 是 Javascript 在服务端的运行环境,构建在 chrome 的 V8 引擎之上,基于事件驱动、非阻塞 I/O 模型,充分利用操作系统提供的异步 I/O 进行多任务的执行,适合于I/O 密集型的应用场景,因为异步,程序无需阻塞等待结果返回,而是基于回调通知的机制,原本同步模式等待的时间,则可以用来处理其它任务
科普:在 Web 服务器方面,著名的 Nginx 也是采用此模式(事件驱动),避免了多线程的线程创建、线程上下文切换的开销,Nginx 采用 C 语言进行编写,主要用来做高性能的 Web 服务器,不适合做业务。
在单核 CPU 系统之上我们采用 单进程 + 单线程 的模式来开发。 在多核 CPU 系统之上,可以通过 child_process.fork 开启多个进程(Node.js 在 v0.8 版本之后新增了 Cluster 来实现多进程架构) ,即 多进程 + 单线程 模式。注意:开启多进程不是为了解决高并发,主要是解决了单进程模式下 Node.js CPU 利用率不足的情况,充分利用多核 CPU 的性能。
五.Node.js 中的进程
process 模块 node.js 中的进程 process 是一个全局对象,无需 require,可以直接使用.
- process.env: 进程环境变量,如 process.env.NODE_ENV
- process.pid: 进程 id
- process.title: 进程名称
- process.version: 进程版本
- process.nextTick: 下一个 tick
- process.stdout: 标准输出流
child_process 模块用来对子进程进行操作
- child_process.spawn(command, args, options)//创建子进程
- child_process.exec(command, options, callback)//执行命令
- child_process.execFile(file, args, options, callback)//执行文件
- child_process.fork(modulePath, args, options)//创建子进程
- child_process.execSync(command, options)//同步执行命令
1.fork 开启子进程 Demo
fork 开启子进程解决计算耗时造成线程阻塞。在进行 compute 计算时创建子进程,子进程计算完成通过 send
方法将结果发送给主进程,主进程通过 message
监听到信息后处理并退出。
const http = require('http')
const fork = require('child_process').fork
const server = http.createServer((req, res) => {
if (req.url === '/fork') {
const child = fork('./fork_compute.js')
//监听事件,当一个子进程使用 process.send() 发送消息时会触发 'message' 事件
child.on('message', (sum) => {
res.end(`通过fork子进程计算出的结果是${sum}`)
child.kill() //关闭子进程
})
child.on('close', (code, signal) => {
console.log(`收到close事件,子进程收到信号 ${signal} 而终止,退出码 ${code}`)
child.kill()
})
child.on('error', (err) => {
console.log(`收到error事件, ${err}`)
child.kill()
})
child.send('创建一个子进程')
res.end('hello parent')
} else {
res.end('hello world')
}
})
server.listen(2020, () => {
console.log(`server started at http://127.0.0.1:2020`)
})
const computation = () => {
let sum = 0
console.info('计算开始')
console.time('计算耗时')
for (let i = 0; i < 1e10; i++) {
sum += i
}
console.info('计算结束')
console.timeEnd('计算耗时')
return sum
}
process.on('message', (msg) => {
console.log(msg, 'process.pid', process.pid) // 子进程id
const sum = computation()
// 如果Node.js进程是通过进程间通信产生的,那么,process.send()方法可以用来给父进程发送消息
process.send(sum)
})
浏览器访问 127.0.0.1:2020/fork
2.cluster
集群
const http = require('http')
const numCPUs = require('os').cpus().length
const cluster = require('cluster')
console.log('numCPUs', numCPUs)
const longComputation = () => {
let sum = 0
for (let i = 0; i < 1e10; i++) {
sum += i
}
return sum
}
if (cluster.isPrimary) {
console.log('运行在主进程, 进程ID是:', process.pid)
// 衍生工作进程
for (let i = 0; i < numCPUs; i++) {
cluster.fork()
}
cluster.on('exit', function (worker, code, signal) {
console.log(`worker ${worker.process.pid} died`)
})
} else {
// 工作进程可以共享任何 TCP 连接
// 在本示例中,其是 HTTP 服务器
const server = http.createServer()
server.on('request', (req, res) => {
if (req.url === '/compute') {
console.info('计算开始', new Date())
const sum = longComputation()
console.info('计算结束', new Date())
return res.end(`Sum is ${sum}`)
} else {
res.end('Ok')
}
console.log(`Worker ${process.pid} started`)
})
server.listen(2020)
}
cluster 原理分析
cluster 模块调用fork 方法来创建子进程,该方法与 child_process 中的 fork 是同一个方法。cluster 模块采用的是经典的主从模型,Cluster 会创建 一个 master,然后根据你指定的数量复制出多个子进程,可以使用 cluster.isMaster 属性判断当前进程是 master 还是 worker(工作进程)。由 master 进程来管理所有的子进程,主进程不负责具体的任务处理,主要工作是负责调度和管理。
cluster 模块使用内置的负载均衡来更好地处理线程之间的压力,该负载均衡使用了 Round-robin 算法(也被称之为循环算法)。当使用 Round-robin 调度策略时,master accepts()所有传入的连接请求,然后将相应的 TCP 请求处理发送给选中的工作进程(该方式仍然通过 IPC 来进行通信)。
开启多进程时候端口疑问讲解:如果多个 Node 进程监听同一个端口时会出现 Error:listen EADDRIUNS 的错误,而 cluster 模块为什么可以让多个子进程监听同一个端口呢? 原因是 master 进程内部启动了一个 TCP 服务器,而真正监听端口的只有这个服务器,当来自前端的请求触发服务器的 connection 事件后,master 会将对应的 socket 具柄发送给子进程
3.child_process 模块与 cluster 模块总结
无论是 child_process 模块还是 cluster 模块,为了解决 Node.js 实例单线程运行,无法利用多核 CPU 的问题而出现的。核心就是父进程(即 master 进程)负责监听端口,接收到新的请求后将其分发给下面的 worker 进程。
cluster 模块的一个弊端:
cluster 内部隐式的构建 TCP 服务器的方式来说对使用者确实简单和透明了很多,但是这种方式无法像使用 childprocess 那样灵活,因为一直主进程只能管理一组相同的工作进程,而自行通过 childprocess 来创建工作进程,一个主进程可以控制多组进程。原因是 child_process 操作子进程时,可以隐式的创建多个 TCP 服务器,而 cluster 模块则不能。
六.Node.js 进程通信原理
前面讲解的无论是 child_process 模块,还是 cluster 模块,都需要主进程和工作进程之间的通信。通过 fork()或者其他 API,创建了子进程之后,为了实现父子进程之间的通信,父子进程之间才能通过 message 和 send()传递信息。
IPC 这个词我想大家并不陌生,不管那一张开发语言只要提到进程通信,都会提到它。IPC 的全称是 Inter-Process Communication,即进程间通信。它的目的是为了让不同的进程能够互相访问资源并进行协调工作
。实现进程间通信的技术有很多,如命名管道,匿名管道,socket,信号量,共享内存,消息队列等。Node 中实现 IPC 通道是依赖于 libuv。windows 下由命名管道(name pipe)实现,unix 系统则采用 Unix Domain Socket 实现。表现在应用层上的进程间通信只有简单的 message 事件和 send()方法,接口十分简洁和消息化。
IPC 创建和实现示意图
IPC 通信管道是如何创建的
父进程在实际创建子进程之前,会创建 IPC 通道并监听它,然后才 真正的创建出 子进程,这个过程中也会通过环境变量(NODECHANNELFD)告诉子进程这个 IPC 通道的文件描述符。子进程在启动的过程中,根据文件描述符去连接这个已存在的 IPC 通道,从而完成父子进程之间的连接
Node.js 句柄传递
讲句柄之前,先想一个问题,send 句柄发送的时候,真的是将服务器对象发送给了子进程?
子进程对象 send()方法可以发送的句柄类型
- net.Socket TCP 套接字
- net.Server TCP 服务器,任意建立在 TCP 服务上的应用层服务都可以享受它带来的好处
- net.Native C++层面的 TCP 套接字或 IPC 管道
- dgram.Socket UDP 套接字
- dgram.Native C++层面的 UDP 套接字
send 句柄发送原理分析
结合句柄的发送与还原示意图更容易理解。
send()方法在将消息发送到 IPC 管道前,实际将消息组装成了两个对象,一个参数是 hadler,另一个是 message。message 参数如下所示:
{
cmd:'NODE_HANDLE',
type:'net.Server',
msg:message
}
发送到 IPC 管道中的实际上是我们要发送的句柄文件描述符。这个 message 对象在写入到 IPC 管道时,也会通过 JSON.stringfy()进行序列化。所以最终发送到 IPC 通道中的信息都是字符串,send()方法能发送消息和句柄并不意味着它能发送任何对象。
连接了 IPC 通道的子线程可以读取父进程发来的消息,将字符串通过 JSON.parse()解析还原为对象后,才触发 message 事件将消息传递给应用层使用。在这个过程中,消息对象还要被进行过滤处理,message.cmd 的值如果以 NODE 为前缀,它将响应一个内部事件 internalMessage,如果 message.cmd 值为 NODEHANDLE,它将取出 message.type 值和得到的文件描述符一起还原出一个对应的对象。
以发送的 TCP 服务器句柄为例,子进程收到消息后的还原过程代码如下:
function(message,handle,emit){
var self = this;
var server = new net.Server();
server.listen(handler,function(){
emit(server);
});
}
这段还原代码, 子进程根据 message.type 创建对应的 TCP 服务器对象,然后监听到文件描述符上。由于底层细节不被应用层感知,所以子进程中,开发者会有一种服务器对象就是从父进程中直接传递过来的错觉。
七.Node.js 多进程架构模型
我们自己实现一个多进程架构守护 Demo
编写主进程
master.js 主要处理以下逻辑:
- 创建一个 server 并监听 8080 端口。
- 根据系统 cpus 开启多个子进程
- 通过子进程对象的 send 方法发送消息到子进程进行通信
- 在主进程中监听了子进程的变化,如果是自杀信号重新启动一个工作进程。
- 主进程在监听到退出消息的时候,先退出子进程在退出主进程
const fork = require('child_process').fork
const cups = require('os').cpus()
const server = require('http').createServer()
server.listen(8080)
process.title = 'node-master'
const workers = {}
const createWorker = () => {
const worker = fork('docs/fe/node/process&thread/worker.js')
worker.on('message', function (msg) {
if ((msg.act = 'suicide')) {
createWorker()
}
})
worker.on('exit', function (code, signal) {
console.log('worker process exited, code: %s signal: %s', code, signal)
delete workers[worker.pid]
})
worker.send('server', server)
workers[worker.pid] = worker
console.log('worker process created, pid: %s ppid: %s', worker.pid, process.pid)
}
for (let i = 0; i < cpus.length; i++) {
createWorker()
}
process.once('SIGINT', close.bind(this, 'SIGINT')) // kill(2) Ctrl-C
process.once('SIGQUIT', close.bind(this, 'SIGQUIT')) // kill(3) Ctrl-\
process.once('SIGTERM', close.bind(this, 'SIGTERM')) // kill(15) default
process.once('exit', close.bind(this))
function close(code) {
console.log('进程退出!', code)
if (code !== 0) {
for (let pid in workers) {
console.log('master process exited, kill worker pid: ', pid)
workers[pid].kill('SIGINT')
}
}
process.exit(0)
}
工作进程
worker.js 子进程处理逻辑如下:
- 创建一个 server 对象,注意这里最开始并没有监听 3000 端口
- 通过 message 事件接收主进程 send 方法发送的消息
- 监听 uncaughtException 事件,捕获未处理的异常,发送自杀信息由主进程重建进程,子进程在链接关闭之后退出
// worker.js
const http = require('http')
const server = http.createServer((req, res) => {
res.writeHead(200, {
'Content-Type': 'text/plan',
})
res.end('I am worker, pid: ' + process.pid + ', ppid: ' + process.ppid)
throw new Error('worker process exception!') // 测试异常进程退出、重启
})
let worker
process.title = 'node-worker'
process.on('message', function (message, sendHandle) {
if (message === 'server') {
worker = sendHandle
worker.on('connection', function (socket) {
server.emit('connection', socket)
})
}
})
process.on('uncaughtException', function (err) {
console.log(err)
process.send({ act: 'suicide' })
worker.close(function () {
process.exit(1)
})
})
八.work_threads
与 child_process 或 cluster 不同,worker_threads 可以共享内存
。 它们通过传输 ArrayBuffer 实例或共享 SharedArrayBuffer 实例来实现。
worker_threads 抽象上提供 mainThread 和 worker。 其中:
- mainThread 相当于就是 nodejs 的主线程
- worker 是单独调起的 worker 子线程 mainThread 通过 new Worker 去实例化子线程,然后通过 MessageChannel 来和 worker 通信。
const { isMainThread, parentPort, workerData, Worker } = require('worker_threads')
function mainThread() {
for (let i = 0; i < 5; i++) {
const worker = new Worker(__filename, { workerData: i })
worker.on('exit', (code) => {
console.log(`main: worker stopped with exit code ${code}`)
})
worker.on('message', (msg) => {
console.log(`main: receive ${msg}`)
worker.postMessage(msg + 1)
})
}
}
function workerThread() {
console.log(`worker: workerDate ${workerData}`)
parentPort.on('message', (msg) => {
console.log(`worker: receive ${msg}`)
}),
parentPort.postMessage(workerData)
}
if (isMainThread) {
mainThread()
} else {
workerThread()
}
const http = require('http');
const { Worker } = require('worker_threads');
console.log('进程id', process.pid);
const server = http.createServer((req, res) => {
if (req.url == '/compute') {
let worker = new Worker('./worker_threads_compute.js', {
workerData: 3000,
});
worker.on('message', sum => {
return res.end(`Sum is ${sum}`); //接收工作线程计算完毕后返回的结果
});
} else {
res.end(`ok`);
}
});
server.listen(2020, () => {
console.log(`server started at http://127.0.0.1:${2020}`);
});
async function asyncFib(n) {
let worker = new Worker('./fib.js', { workerData: n });
return new Promise(resolve => {
worker.on('message', val => {
resolve(val); //接收工作线程计算完毕后返回的结果
});
});
}
const { workerData, parentPort } = require('worker_threads');
console.log('worker_threads进程id', process.pid);
let num = workerData; //获取参数
let res = computation(num);
parentPort.postMessage(res); //向主线程返回结果
function computation(num) {
console.log('传入的参数:', num);
let sum = 0;
console.info('计算开始');
console.time('计算耗时');
for (let i = 0; i < 1e10; i++) {
sum += i;
}
console.info('计算结束');
console.timeEnd('计算耗时');
return sum;
}