未分类

试玩NodeJS多进程

NodeJS的JavaScript运行在单个进程的单个线程上,一个JavaScript执行进程只能利用一个CPU核心,而如今大多数CPU均为多核CPU,为了充分利用CPU资源,Node提供了child_process和cluster模块来实现多进程以及进程管理。本文将根据Master-Worker模式,搭建一个简单的服务器集群来充分利用多核CPU资源,探索进程间通信、负载均衡、进程重启等知识。

下图是Master-Worker模式,进程分为master进程和worker进程,master进程负责调度或管理worker进程,worker进程则负责具体的业务处理。在服务器层面,worker可以是一个服务进程,负责处理来自客户端的请求,多个worker便相当于多个服务器,从而构成一个服务器集群。master则是负责创建worker,将来自客户端的请求分配到各个服务器上去处理,并监控worker的运行状态以及进行管理等操作。

Master-Worker模式

本文将从child_process模块开始,熟悉该模块的基本用法。后面再继续进入cluster模块的学习。本文所用的代码示例可以从该仓库中找到–【multi-process】

一、child_process

1.1、Hello world

child_process模块提供了spawn()exec()execFile()fork()这4个方法用于创建子进程,本文将使用fork()方法来创建子进程,fork()方法只需指定要执行的JavaScript文件模块,即可创建Node的子进程。下面是简单的HelloWorld示例,master进程根据CPU数量创建出相应数量的worker进程,worker进程中利用进程ID来标记自己。

以下是master进程代码,文件名为master.js。

1
2
3
4
5
6
7
8
const childProcess = require('child_process')
const cpuNum = require('os').cpus().length

for (let i = 0; i < cpuNum; ++i) {
childProcess.fork('./worker.js')
}

console.log('Master: Hello world.')

以下是worker进程的代码,文件名为worker.js。

1
console.log('Worker-' + process.pid + ': Hello world.')

执行node master.js,得到如下结果,master创建4个worker后输出HelloWorld信息,每个worker也分别输出自己的HelloWorld信息。
hello-world运行结果

1.2、父子进程间的通信

创建worker之后,接下来实现master和worker之间的通信。Node父子进程之间可以通过on('message')send()来实现通信,on('message')其实是监听message事件,当该进程收到其他进程发送的消息时,便会触发message事件。send()方法则是用于向其他进程发送信息。master进程中调用child_processfork()方法后会得到一个子进程的实例,通过这个实例可以监听来自子进程的消息或者向子进程发送消息。worker进程则通过process对象接口监听来自父进程的消息或者向父进程发送消息。

父子进程通信

下面是简单示例,master创建worker之后,向worker发送信息,worker在收到master的信息后将信息输出,并回复master。master收到回复后输出信息。

master.js

1
2
3
4
5
6
7
8
const childProcess = require('child_process')
const worker = childProcess.fork('./worker.js')

worker.send('Hello world.')

worker.on('message', (msg) => {
console.log('[Master] Received message from worker: ' + msg)
})

worker.js

1
2
3
4
process.on('message', (msg) => {
console.log('[Worker] Received message from master: ' + msg)
process.send('Hi master.')
})

执行node master.js,结果如下,master和worker可以正常通信。

进程通信示例

1.3、Master分发请求给Worker处理

进程通信时使用到的send()方法,除了发送普通的对象之外,还可以用于发送句柄。句柄是一种引用,可以用来标识资源,例如通过句柄可以标识一个socket对象、一个server对象等。利用句柄传递,可以实现请求的分发。master进程创建一个TCP服务器监听特定端口,收到客户端的请求后,会得到一个socket对象,通过这个socket对象可以跟客户端进行通信从而处理客户端的请求。master进程可以通过句柄传递将该socket对象发送给worker进程,让worker进程去处理请求。该模式的结构图如下,在master上还可以通过特定的算法实现负载均衡,将客户端的请求均衡地分发给worker去处理。

Master分发请求给Worker处理

下面是一个简单示例。master创建TCP服务器并监听8080端口,收到请求后将请求分发给worker处理。worker收到master发来的socket以后,通过socket对客户端进行响应。为方便看到请求的处理情况,worker给出的响应内容会说明该请求是被哪个worker处理。

master.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
const childProcess = require('child_process')
const net = require('net')
const cpuNum = require('os').cpus().length

// 创建工作进程
let workers = []
let cur = 0
for (let i = 0; i < cpuNum; ++i) {
workers.push(childProcess.fork('./worker.js'))
console.log('Create worker-' + workers[i].pid)
}

// 创建TCP服务器
const server = net.createServer()

// 服务器收到请求后分发给工作进程去处理
// 通过轮转方式实现工作进程的负载均衡
server.on('connection', (socket) => {
workers[cur].send('socket', socket)
cur = Number.parseInt((cur + 1) % cpuNum)
})

server.listen(8080, () => {
console.log('TCP server: 127.0.0.1:8080')
})

worker.js

1
2
3
4
5
6
7
8
process.on('message', (msg, socket) => {
if (msg === 'socket' && socket) {
// 利用setTimeout模拟处理请求时的操作耗时
setTimeout(() => {
socket.end('Request handled by worker-' + process.pid)
}, 10)
}
})

为了访问TCP服务器进行实验,这里需要写一个简单的TCP客户端,代码如下。该客户端会创建10个TCP连接,得到服务器响应之后将响应的内容进行输出。

tcp_client.js

1
2
3
4
5
6
7
8
9
10
11
const net = require('net')
const maxConnectCount = 10

for (let i = 0; i < maxConnectCount; ++i) {
net.createConnection({
port: 8080,
host: '127.0.0.1'
}).on('data', (data) => {
console.log(data.toString())
})
}

先执行node master.js启动服务器,然后执行node tcp_client.js启动客户端。得到的结果如下,10个请求被分发到不同服务器上进行处理,并且可以看到master中的轮转分发请求起到了作用,实现了简单的负载均衡。

TCP连接处理情况

1.4、Worker监听同一个端口

前面说过,sned()方法可以传递句柄,通过传递句柄,我们除了发送socket对象之外,还可以直接发送一个server对象。我们可以在master进程中创建一个TCP服务器,将服务器对象直接发送给worker进程,让worker去监听端口并处理请求。这样的话,master和worker进程都会监听相同端口,当客户端发起请求时,请求可能被master接收,也可能被worker接收。而master不负责处理业务,如果请求被master接收到,由于master上没有处理业务的逻辑,请求将无法得到处理。因此可以实现为如下图所示的模式,master将TCP服务器发送给worker使得所有worker监听同一个端口以后,master关闭对端口的监听。这样便只有worker在监听同一端口,请求将会都被worker进行处理,与master无关。

Worker监听同一端口

这种模式下,多个进程监听相同端口,当网络请求到来时,会进行抢占式调度,只有一个进程会抢到连接然后进行服务。因此,可以确保每个请求都会被特定的worker处理,而不是一个请求同时被多个worker处理。但由于是抢占式的调度,不能够保证每个worker的负载均衡。可能由于处理不同业务时CPU和IO繁忙度的不同导致进程抢到的请求数量不同,形成负载不均衡的情况。

下面是简单示例。

master.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
const childProcess = require('child_process')
const net = require('net')
const cpuNum = require('os').cpus().length

// 创建工作进程
let workers = []
let cur = 0
for (let i = 0; i < cpuNum; ++i) {
workers.push(childProcess.fork('./worker.js'))
console.log('Create worker-' + workers[i].pid)
}

// 创建TCP服务器
const server = net.createServer()

server.listen(8080, () => {
console.log('TCP server: 127.0.0.1:8080')
// 监听端口后将服务器句柄发送给工作进程
for (let i = 0; i < cpuNum; ++i) {
workers[i].send('server', server)
}
// 关闭主线程服务器的端口监听
server.close()
})

worker.js

1
2
3
4
5
6
7
8
9
10
process.on('message', (msg, server) => {
if (msg === 'server' && server) {
server.on('connection', (socket) => {
// 利用setTimeout模拟处理请求时的操作耗时
setTimeout(() => {
socket.end('Request handled by worker-' + process.pid)
}, 10)
})
}
})

继续使用之前的tcp_client来进行实验,先执行node master.js启动服务器,然后执行node tcp_client.js启动客户端。得到结果如下,请求可以被不同的worker进程处理,但由于worker进程是抢占式地为请求进行服务,所以不一定能实现每个worker的负载均衡。
TCP连接处理情况

1.5、进程重启

worker进程可能因为某些异常情况而退出,为了提高集群的稳定性,master进程需要监听子进程的存活状态,当子进程退出之后,master进程要及时重启新的子进程。在Node中,子进程退出时,会在父进程中触发exit事件。父进程只需通过监听该事件便可知道子进程是否退出,并在退出的时候做出相应的处理。下面是在之前的监听同一端口模式下,增加了进程重启功能。进程重启时,master进程需要重新传递server对象给新的worker进程,因此不能关闭master进程上的server,否则在进程重启时server被关闭,得到的句柄将为空,无法正常传递。master进程的server不关闭,会导致master进程也监听端口,会有部分请求被master进程接收,为了让着部分请求能够得到处理,可以在master进程添加处理业务的代码。由于master也参与了业务处理,业务处理进程的数量增加1个,所以worker进程可以少创建1个。这也就是下面简单示例中的做法。

这种实现方式使得master即进行进程管理又参与了业务处理,若果要保持master只负责进程管理而不涉及业务处理,可以采取另外一种实现方式:master接收到请求后,按照前面1.3节的做法将请求转发给worker进行处理,这样master将继续只负责对worker进程的管理。

master.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
const childProcess = require('child_process')
const net = require('net')
const cpuNum = require('os').cpus().length - 1

// 创建工作进程
let workers = []
let cur = 0
for (let i = 0; i < cpuNum; ++i) {
workers.push(childProcess.fork('./worker.js'))
console.log('Create worker-' + workers[i].pid)
}

// 创建TCP服务器
const server = net.createServer()

// 由于master进程也会监听端口。因此需要对请求做出处理
server.on('connection', (socket) => {
// 利用setTimeout模拟处理请求时的操作耗时
setTimeout(() => {
socket.end('Request handled by master')
}, 10)
})

server.listen(8080, () => {
console.log('TCP server: 127.0.0.1:8080')
// 监听端口后将服务器句柄发送给工作进程
for (let i = 0; i < cpuNum; ++i) {
workers[i].send('server', server)
// 工作进程退出后重启
workers[i].on('exit', ((i) => {
return () => {
console.log('Worker-' + workers[i].pid + ' exited')
workers[i] = childProcess.fork('./worker.js')
console.log('Create worker-' + workers[i].pid)
workers[i].send('server', server)
}
})(i))
}
// 关闭主线程服务器的端口监听
// server.close()
})

worker.js

1
2
3
4
5
6
7
8
9
10
process.on('message', (msg, server) => {
if (msg === 'server' && server) {
server.on('connection', (socket) => {
// 利用setTimeout模拟处理请求时的操作耗时
setTimeout(() => {
socket.end('Request handled by worker-' + process.pid)
}, 10)
})
}
})

执行node master.js启动服务器后,可以通过任务管理器直接杀掉进程来模拟进程异常退出。可以看到worker进程退出后,master能够发现并及时创建新的worker进程。任务管理器中的Node进程数量恢复原样。
手动杀进程

进程重启

执行node tcp_client.js启动客户端,客户端发出的连接请求被处理的情况如下,同样地,由于监听同一端口,进程之间采取抢占式服务,不一定保障负载均衡。

TCP连接处理情况

1.6、处理HTTP服务

前面的示例所使用的是TCP服务器,如果要处理HTTP请求,需要使用HTTP服务器。而HTTP其实是基于TCP的,发送HTTP请求的时候同样也会发起TCP连接。只需要对前面的TCP服务器进行一点小改动便可以支持HTTP了。在进程中新增HTTP服务器,当TCP服务器收到请求时,把请求提交给HTTP服务器处理即可。下面是worker进程的改动示例。

worker.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
const http = require('http')
const httpServer = http.createServer((req, res) => {
// 利用setTimeout模拟处理请求时的操作耗时
setTimeout(() => {
res.writeHead(200, { 'Content-Type': 'text/plain' })
res.end('Request handled by worker-' + process.pid)
}, 10)
})

process.on('message', (msg, server) => {
if (msg === 'server' && server) {
server.on('connection', (socket) => {
// 提交给HTTP服务器处理
httpServer.emit('connection', socket)
})
}
})

支持HTTP请求

二、cluster

前面简单描述了使用child_process实现单机Node集群的做法,需要处理挺多的细节。Node提供了cluster模块,该模块提供了更完善的API,除了能够实现多进程充分利用CPU资源以外,还能够帮助我们更好地进行进程管理和处理进程的健壮性问题。下面是简单示例,if条件语句判断当前进程是master还是worker,master进程会执行if语句块包含的代码,而worker进程则执行else语句块包含的代码。master进程中,利用cluster模块创建了与CPU数量相应的worker进程,并通过监听cluster的online事件来判断worker的创建成功。在worker进程退出后,会触发master进程中cluster模块上的exit事件,通过监听该事件可以了解worker进程的退出情况并及时fork新的worker。最后,worker进程中只需创建服务器监听端口,对客户端请求做出处理即可。(这里设置相同端口8080之后,所有worker都将监听同一个端口)

server.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
const cluster = require('cluster')

if (cluster.isMaster) {
const cpuNum = require('os').cpus().length

for (let i = 0; i < cpuNum; ++i) {
cluster.fork()
}

// 创建进程完成后输出提示信息
cluster.on('online', (worker) => {
console.log('Create worker-' + worker.process.pid)
})

// 子进程退出后重启
cluster.on('exit', (worker, code, signal) => {
console.log('[Master] worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal)
cluster.fork()
})
} else {
const net = require('net')
net.createServer().on('connection', (socket) => {
// 利用setTimeout模拟处理请求时的操作耗时
setTimeout(() => {
socket.end('Request handled by worker-' + process.pid)
}, 10)
}).listen(8080)
}

执行node server.js启动服务器,继续按照之前的做法,利用任务管理器杀死进程,可以看到在进程被杀后master能够及时启动新的worker。

进程重启

继续运行tcp_client,可以看到服务器能够正常处理请求。

TCP连接处理情况

三、小结

利用child_process和cluster模块能够很好地实现Master-Worker模式多进程架构,实现单机服务器集群,充分利用了多核CPU资源。通过进程通信能够实现进程管理、重启以及负载均衡,从而提高集群的稳定性和健壮性。

分享到