带你从零看清 Node 源码 createServer 和负载均衡整个过程
发布于 2 个月前 作者 yan 127 次浏览

(给前端大全加星标,提升前端技能)

作者: 前端巅峰 公号 / Peter 谭金杰

写在开头:

作为一名曾经重度使用Node.js作为即时通讯客户端接入层的开发人员,无法避免调试V8,配合开发addon。于是对Node.js源码产生了很大的兴趣~

顺便吐槽一句,Node的内存控制,由于是自动回收,我之前做的产品是20万人超级群的IM产品,像一秒钟1000条消息,持续时间长了内存和CPU占用还是会有一些问题

之前写过cluster模块源码分析、PM2原理等,感觉兴趣的可以去公众号翻一翻

Node.js的js部分源码基本看得差不多了,今天写一个createServer过程给大家,对于不怎么熟悉Node.js源码的朋友来说,可能是一个不错的开始,源码在gitHub上有,直接克隆即可,最近一直比较忙,公司和业余的工作也是,所以原创比较少。


原生Node.js创建一个基本的服务:

var http = require('http'); 
 
http.createServer(function (request, response) { 
 
// 发送 HTTP 头部  
 
// HTTP 状态值: 200 : OK 
 
// 内容类型: text/plain 
 
response.writeHead(200, {'Content-Type': 'text/plain'}); 
 
// 发送响应数据 "Hello World" 
 
response.end('Hello World\n'); 
 
}).listen(8888); 
 
// 终端打印如下信息 
 
console.log('Server running at http://127.0.0.1:8888/'); 

我们目前只分析Node.js源码的js部分的

首先找到Node.js源码的lib文件夹

然后找到http.js文件

发现createServer真正返回的是new Server,而 Server来自_http_server

于是找到同目录下的_http_server.js文件,发现整个文件有800行的样子,全局搜索Server找到函数

function Server(options, requestListener) { 
 
if (!(this instanceof Server)) return new Server(options, requestListener); 
 
if (typeof options === 'function') { 
 
   requestListener = options; 
 
   options = {}; 
 
 } else if (options == null || typeof options === 'object') { 
 
   options = { ...options }; 
 
 } else { 
 
throw new ERR_INVALID_ARG_TYPE('options', 'object', options); 
 
 } 
 
this[kIncomingMessage] = options.IncomingMessage || IncomingMessage; 
 
this[kServerResponse] = options.ServerResponse || ServerResponse; 
 
 net.Server.call(this, { allowHalfOpen: true }); 
 
if (requestListener) { 
 
this.on('request', requestListener); 
 
 } 

createServer函数解析:

  • 参数控制有点像redux源码里的initState和reducer,根据传入类型不同,做响应的处理
this.on('request', requestListener);} 
  • 每次有请求,就会调用requestListener这个回调函数
  • 至于IncomingMessage和ServerResponse,请求是流,响应也是流,请求是可读流,响应是可写流,当时写那个静态资源服务器时候有提到过
  • 那么怎么可以链式调用?有人可能会有疑惑。Node.js源码遵循commonJs规范,大都挂载在prototype上,所以函数开头有,就是确保可以链式调用
if (!(this instanceof Server))  
 
return new Server(options, requestListener); 

上面已经将onrequest事件触发回调函数讲清楚了,那么链式调用listen方法,监听端口是怎么回事呢?

传统的链式调用,像JQ源码是return this , 手动实现A+规范的Promise则是返回一个全新的Promise,然后Promise原型上有then方法,于是可以链式调用


怎么实现.listen链式调用,重点在这行代码:

net.Server.call(this, { allowHalfOpen: true }); 

** allowHalfOpen实验结论: 这里TCP的知识不再做过度的讲解**

(1)allowHalfOpen为true,一端发送FIN报文: 
 
进程结束了,那么肯定会发送FIN报文; 
 
进程未结束,不会发送FIN报文 
 
(2)allowHalfOpen为false,一端发送FIN报文: 
 
进程结束了,肯定发送FIN报文; 
 
进程未结束,也会发送FIN报文; 

于是找到net.js文件模块中的Server函数

function Server(options, connectionListener) { 
 
 if (!(this instanceof Server)) 
 
   return new Server(options, connectionListener); 
 


 
 EventEmitter.call(this); 
 


 
 if (typeof options === 'function') { 
 
   connectionListener = options; 
 
   options = {}; 
 
   this.on('connection', connectionListener); 
 
 } else if (options == null || typeof options === 'object') { 
 
   options = { ...options }; 
 


 
   if (typeof connectionListener === 'function') { 
 
     this.on('connection', connectionListener); 
 
   } 
 
 } else { 
 
   throw new ERR_INVALID_ARG_TYPE('options', 'Object', options); 
 
 } 
 


 
 this._connections = 0; 
 


 
 Object.defineProperty(this, 'connections', { 
 
   get: deprecate(() => { 
 


 
     if (this._usingWorkers) { 
 
       return null; 
 
     } 
 
     return this._connections; 
 
   }, 'Server.connections property is deprecated. ' + 
 
      'Use Server.getConnections method instead.', 'DEP0020'), 
 
   set: deprecate((val) => (this._connections = val), 
 
                  'Server.connections property is deprecated.', 
 
                  'DEP0020'), 
 
   configurable: true, enumerable: false 
 
 }); 
 


 
 this[async_id_symbol] = -1; 
 
 this._handle = null; 
 
 this._usingWorkers = false; 
 
 this._workers = []; 
 
 this._unref = false; 
 


 
 this.allowHalfOpen = options.allowHalfOpen || false; 
 
 this.pauseOnConnect = !!options.pauseOnConnect; 
 
} 

这里巧妙的通过.call调用net模块Server函数,保证了this指向一致

this._handle = null 这里是因为Node.js考虑到多进程问题,所以会hack掉这个属性,因为.listen方法最终会调用_handle中的方法,多个进程只会启动一个真正进程监听端口,然后负责分发给不同进程,这个后面会讲

Node.js源码的几个特色:

  • 遵循conmonjs规范,很多方法挂载到prototype上了
  • 很多object.definepropoty数据劫持
  • this指向的修改,配合第一个进行链式调用
  • 自带自定义事件模块,很多内置的函数都继承或通过Object.setPrototypeOf去封装了一些自定义事件
  • 代码模块互相依赖比较多,一个.listen过程就很麻烦,初学代码者很容易睡着
  • 源码学习,本就枯燥。没什么好说的了

我在net.js文件模块中发现了一个原型上.listen的方法:

Server.prototype.listen = function(...args) { 
 
 const normalized = normalizeArgs(args); 
 
 var options = normalized[0]; 
 
 const cb = normalized[1]; 
 


 
 if (this._handle) { 
 
   throw new ERR_SERVER_ALREADY_LISTEN(); 
 
 } 
 


 
 if (cb !== null) { 
 
   this.once('listening', cb); 
 
 } 
 
 const backlogFromArgs = 
 
   // (handle, backlog) or (path, backlog) or (port, backlog) 
 
   toNumber(args.length > 1 && args[1]) || 
 
   toNumber(args.length > 2 && args[2]);  // (port, host, backlog) 
 


 
 options = options._handle || options.handle || options; 
 
 const flags = getFlags(options.ipv6Only); 
 
 // (handle[, backlog][, cb]) where handle is an object with a handle 
 
 if (options instanceof TCP) { 
 
   this._handle = options; 
 
   this[async_id_symbol] = this._handle.getAsyncId(); 
 
   listenInCluster(this, null, -1, -1, backlogFromArgs); 
 
   return this; 
 
 } 
 
 // (handle[, backlog][, cb]) where handle is an object with a fd 
 
 if (typeof options.fd === 'number' && options.fd >= 0) { 
 
   listenInCluster(this, null, null, null, backlogFromArgs, options.fd); 
 
   return this; 
 
 } 
 


 
 // ([port][, host][, backlog][, cb]) where port is omitted, 
 
 // that is, listen(), listen(null), listen(cb), or listen(null, cb) 
 
 // or (options[, cb]) where options.port is explicitly set as undefined or 
 
 // null, bind to an arbitrary unused port 
 
 if (args.length === 0 || typeof args[0] === 'function' || 
 
     (typeof options.port === 'undefined' && 'port' in options) || 
 
     options.port === null) { 
 
   options.port = 0; 
 
 } 
 
 // ([port][, host][, backlog][, cb]) where port is specified 
 
 // or (options[, cb]) where options.port is specified 
 
 // or if options.port is normalized as 0 before 
 
 var backlog; 
 
 if (typeof options.port === 'number' || typeof options.port === 'string') { 
 
   if (!isLegalPort(options.port)) { 
 
     throw new ERR_SOCKET_BAD_PORT(options.port); 
 
   } 
 
   backlog = options.backlog || backlogFromArgs; 
 
   // start TCP server listening on host:port 
 
   if (options.host) { 
 
     lookupAndListen(this, options.port | 0, options.host, backlog, 
 
                     options.exclusive, flags); 
 
   } else { // Undefined host, listens on unspecified address 
 
     // Default addressType 4 will be used to search for master server 
 
     listenInCluster(this, null, options.port | 0, 4, 
 
                     backlog, undefined, options.exclusive); 
 
   } 
 
   return this; 
 
 } 
 


 
 // (path[, backlog][, cb]) or (options[, cb]) 
 
 // where path or options.path is a UNIX domain socket or Windows pipe 
 
 if (options.path && isPipeName(options.path)) { 
 
   var pipeName = this._pipeName = options.path; 
 
   backlog = options.backlog || backlogFromArgs; 
 
   listenInCluster(this, pipeName, -1, -1, 
 
                   backlog, undefined, options.exclusive); 
 


 
   if (!this._handle) { 
 
     // Failed and an error shall be emitted in the next tick. 
 
     // Therefore, we directly return. 
 
     return this; 
 
   } 
 


 
   let mode = 0; 
 
   if (options.readableAll === true) 
 
     mode |= PipeConstants.UV_READABLE; 
 
   if (options.writableAll === true) 
 
     mode |= PipeConstants.UV_WRITABLE; 
 
   if (mode !== 0) { 
 
     const err = this._handle.fchmod(mode); 
 
     if (err) { 
 
       this._handle.close(); 
 
       this._handle = null; 
 
       throw errnoException(err, 'uv_pipe_chmod'); 
 
     } 
 
   } 
 
   return this; 
 
 } 
 


 
 if (!(('port' in options) || ('path' in options))) { 
 
   throw new ERR_INVALID_ARG_VALUE('options', options, 
 
                                   'must have the property "port" or "path"'); 
 
 } 
 


 
 throw new ERR_INVALID_OPT_VALUE('options', inspect(options)); 
 
}; 

这个就是我们要找的listen方法,可是里面很多ipv4和ipv6的处理,最重要的方法是listenInCluster


这个函数需要好好看一下,只有几十行

function listenInCluster(server, address, port, addressType, 
 
                        backlog, fd, exclusive, flags) { 
 
 exclusive = !!exclusive; 
 
 if (cluster === undefined) cluster = require('cluster'); 
 


 
 if (cluster.isMaster || exclusive) { 
 
   // Will create a new handle 
 
   // _listen2 sets up the listened handle, it is still named like this 
 
   // to avoid breaking code that wraps this method 
 
   server._listen2(address, port, addressType, backlog, fd, flags); 
 
   return; 
 
 } 
 


 
 const serverQuery = { 
 
   address: address, 
 
   port: port, 
 
   addressType: addressType, 
 
   fd: fd, 
 
   flags, 
 
 }; 
 


 
 // Get the master's server handle, and listen on it 
 
 cluster._getServer(server, serverQuery, listenOnMasterHandle); 
 


 
 function listenOnMasterHandle(err, handle) { 
 
   err = checkBindError(err, port, handle); 
 


 
   if (err) { 
 
     var ex = exceptionWithHostPort(err, 'bind', address, port); 
 
     return server.emit('error', ex); 
 
   } 
 


 
   // Reuse master's server handle 
 
   server._handle = handle; 
 
   // _listen2 sets up the listened handle, it is still named like this 
 
   // to avoid breaking code that wraps this method 
 
   server._listen2(address, port, addressType, backlog, fd, flags); 
 
 } 
 
} 

如果是主进程,那么就直接调用_.listen2方法了

Server.prototype._listen2 = setupListenHandle; 

找到setupListenHandle函数

function setupListenHandle(address, port, addressType, backlog, fd, flags) { 
 
 debug('setupListenHandle', address, port, addressType, backlog, fd); 
 


 
 // If there is not yet a handle, we need to create one and bind. 
 
 // In the case of a server sent via IPC, we don't need to do this. 
 
 if (this._handle) { 
 
   debug('setupListenHandle: have a handle already'); 
 
 } else { 
 
   debug('setupListenHandle: create a handle'); 
 


 
   var rval = null; 
 


 
   // Try to bind to the unspecified IPv6 address, see if IPv6 is available 
 
   if (!address && typeof fd !== 'number') { 
 
     rval = createServerHandle(DEFAULT_IPV6_ADDR, port, 6, fd, flags); 
 


 
     if (typeof rval === 'number') { 
 
       rval = null; 
 
       address = DEFAULT_IPV4_ADDR; 
 
       addressType = 4; 
 
     } else { 
 
       address = DEFAULT_IPV6_ADDR; 
 
       addressType = 6; 
 
     } 
 
   } 
 


 
   if (rval === null) 
 
     rval = createServerHandle(address, port, addressType, fd, flags); 
 


 
   if (typeof rval === 'number') { 
 
     var error = uvExceptionWithHostPort(rval, 'listen', address, port); 
 
     process.nextTick(emitErrorNT, this, error); 
 
     return; 
 
   } 
 
   this._handle = rval; 
 
 } 
 


 
 this[async_id_symbol] = getNewAsyncId(this._handle); 
 
 this._handle.onconnection = onconnection; 
 
 this._handle[owner_symbol] = this; 
 


 
 // Use a backlog of 512 entries. We pass 511 to the listen() call because 
 
 // the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1); 
 
 // which will thus give us a backlog of 512 entries. 
 
 const err = this._handle.listen(backlog || 511); 
 


 
 if (err) { 
 
   var ex = uvExceptionWithHostPort(err, 'listen', address, port); 
 
   this._handle.close(); 
 
   this._handle = null; 
 
   defaultTriggerAsyncIdScope(this[async_id_symbol], 
 
                              process.nextTick, 
 
                              emitErrorNT, 
 
                              this, 
 
                              ex); 
 
   return; 
 
 } 
 


 
 // Generate connection key, this should be unique to the connection 
 
 this._connectionKey = addressType + ':' + address + ':' + port; 
 


 
 // Unref the handle if the server was unref'ed prior to listening 
 
 if (this._unref) 
 
   this.unref(); 
 


 
 defaultTriggerAsyncIdScope(this[async_id_symbol], 
 
                            process.nextTick, 
 
                            emitListeningNT, 
 
                            this); 
 
} 

里面的createServerHandle是重点

function createServerHandle(address, port, addressType, fd, flags) { 
 
 var err = 0; 
 
 // Assign handle in listen, and clean up if bind or listen fails 
 
 var handle; 
 


 
 var isTCP = false; 
 
 if (typeof fd === 'number' && fd >= 0) { 
 
   try { 
 
     handle = createHandle(fd, true); 
 
   } catch (e) { 
 
     // Not a fd we can listen on.  This will trigger an error. 
 
     debug('listen invalid fd=%d:', fd, e.message); 
 
     return UV_EINVAL; 
 
   } 
 


 
   err = handle.open(fd); 
 
   if (err) 
 
     return err; 
 


 
   assert(!address && !port); 
 
 } else if (port === -1 && addressType === -1) { 
 
   handle = new Pipe(PipeConstants.SERVER); 
 
   if (process.platform === 'win32') { 
 
     var instances = parseInt(process.env.NODE_PENDING_PIPE_INSTANCES); 
 
     if (!Number.isNaN(instances)) { 
 
       handle.setPendingInstances(instances); 
 
     } 
 
   } 
 
 } else { 
 
   handle = new TCP(TCPConstants.SERVER); 
 
   isTCP = true; 
 
 } 
 


 
 if (address || port || isTCP) { 
 
   debug('bind to', address || 'any'); 
 
   if (!address) { 
 
     // Try binding to ipv6 first 
 
     err = handle.bind6(DEFAULT_IPV6_ADDR, port, flags); 
 
     if (err) { 
 
       handle.close(); 
 
       // Fallback to ipv4 
 
       return createServerHandle(DEFAULT_IPV4_ADDR, port); 
 
     } 
 
   } else if (addressType === 6) { 
 
     err = handle.bind6(address, port, flags); 
 
   } else { 
 
     err = handle.bind(address, port); 
 
   } 
 
 } 
 


 
 if (err) { 
 
   handle.close(); 
 
   return err; 
 
 } 
 


 
 return handle; 
 
} 

已经可以看到TCP了,离真正的绑定监听端口,更近了一步

最终通过下面的方法绑定监听端口

handle.bind6(address, port, flags); 
 
 或者 
 
 handle.bind(address, port); 

首选ipv6绑定,是因为ipv6可以接受到ipv4的套接字,而ipv4不可以接受ipv6的套接字,当然也有方法可以接收,就是麻烦了一点


上面的内容,请你认真看,因为下面会更复杂,设计到Node.js的多进程负载均衡原理

如果不是主进程,就调用cluster._getServer,找到cluster源码

'use strict'; 
 


 
const childOrMaster = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master'; 
 
module.exports = require(`internal/cluster/${childOrMaster}`); 

找到_getServer函数源码

// `obj` is a net#Server or a dgram#Socket object. 
 
cluster._getServer = function(obj, options, cb) { 
 
 let address = options.address; 
 


 
 // Resolve unix socket paths to absolute paths 
 
 if (options.port < 0 && typeof address === 'string' && 
 
     process.platform !== 'win32') 
 
   address = path.resolve(address); 
 


 
 const indexesKey = [address, 
 
                     options.port, 
 
                     options.addressType, 
 
                     options.fd ].join(':'); 
 


 
 let index = indexes.get(indexesKey); 
 


 
 if (index === undefined) 
 
   index = 0; 
 
 else 
 
   index++; 
 


 
 indexes.set(indexesKey, index); 
 


 
 const message = { 
 
   act: 'queryServer', 
 
   index, 
 
   data: null, 
 
   ...options 
 
 }; 
 


 
 message.address = address; 
 


 
 // Set custom data on handle (i.e. tls tickets key) 
 
 if (obj._getServerData) 
 
   message.data = obj._getServerData(); 
 


 
 send(message, (reply, handle) => { 
 
   if (typeof obj._setServerData === 'function') 
 
     obj._setServerData(reply.data); 
 


 
   if (handle) 
 
     shared(reply, handle, indexesKey, cb);  // Shared listen socket. 
 
   else 
 
     rr(reply, indexesKey, cb);              // Round-robin. 
 
 }); 
 


 
 obj.once('listening', () => { 
 
   cluster.worker.state = 'listening'; 
 
   const address = obj.address(); 
 
   message.act = 'listening'; 
 
   message.port = (address && address.port) || options.port; 
 
   send(message); 
 
 }); 
 
}; 

我们之前传入了三个参数给它,分别是

server,serverQuery,listenOnMasterHandle


这里是比较复杂的,曾经我也在这里迷茫过一段时间,但是想着还是看下去吧。坚持下,大家如果看到这里看不下去了,先休息下,保存着。后面等心情平复了再静下来接下去看


首先我们传入了Server、serverQuery和cb(回调函数listenOnMasterHandle),整个cluster模块的_getServer中最重要的就是:

if (obj._getServerData) 
 
   message.data = obj._getServerData(); 
 


 
 send(message, (reply, handle) => { 
 
   if (typeof obj._setServerData === 'function') 
 
     obj._setServerData(reply.data); 
 


 
   if (handle) 
 
     shared(reply, handle, indexesKey, cb);  // Shared listen socket. 
 
   else 
 
     rr(reply, indexesKey, cb);              // Round-robin. 
 
 }); 

首先我们会先获取server上的data数据,然后调用send函数

function send(message, cb) { 
 
 return sendHelper(process, message, null, cb); 
 
} 
 


send函数调用的是cluster模块的utills文件内的函数,传入了一个默认值process



 
function sendHelper(proc, message, handle, cb) { 
 
 if (!proc.connected) 
 
   return false; 
 


 
 // Mark message as internal. See INTERNAL_PREFIX in lib/child_process.js 
 
 message = { cmd: 'NODE_CLUSTER', ...message, seq }; 
 


 
 if (typeof cb === 'function') 
 
   callbacks.set(seq, cb); 
 


 
 seq += 1; 
 
 return proc.send(message, handle); 
 
} 

这里要看清楚,我们调用sendHelper传入的第三个参数是null  !!!

那么主进程返回也是null

send(message, (reply, handle) => { 
 
   if (typeof obj._setServerData === 'function') 
 
     obj._setServerData(reply.data); 
 


 
   if (handle) 
 
     shared(reply, handle, indexesKey, cb);  // Shared listen socket. 
 
   else 
 
     rr(reply, indexesKey, cb);              // Round-robin. 
 
 }); 

所以我们会进入 rr 函数调用的这个判断,这里调用rr传入的cb就是在net.js模块定义的listenOnMasterHandle函数

Node.js的负载均衡算法是轮询,官方给出的解释是简单粗暴效率高

上面的sendHelper函数就是做到了这点,每次+1

if (typeof cb === 'function') 
 
   callbacks.set(seq, cb); 
 


 
 seq += 1; 
function rr(message, indexesKey, cb) { 
 
 if (message.errno) 
 
   return cb(message.errno, null); 
 


 
 var key = message.key; 
 


 
 function listen(backlog) { 
 
   // TODO(bnoordhuis) Send a message to the master that tells it to 
 
   // update the backlog size. The actual backlog should probably be 
 
   // the largest requested size by any worker. 
 
   return 0; 
 
 } 
 


 
 function close() { 
 
   // lib/net.js treats server._handle.close() as effectively synchronous. 
 
   // That means there is a time window between the call to close() and 
 
   // the ack by the master process in which we can still receive handles. 
 
   // onconnection() below handles that by sending those handles back to 
 
   // the master. 
 
   if (key === undefined) 
 
     return; 
 


 
   send({ act: 'close', key }); 
 
   handles.delete(key); 
 
   indexes.delete(indexesKey); 
 
   key = undefined; 
 
 } 
 


 
 function getsockname(out) { 
 
   if (key) 
 
     Object.assign(out, message.sockname); 
 


 
   return 0; 
 
 } 
 


 
 // Faux handle. Mimics a TCPWrap with just enough fidelity to get away 
 
 // with it. Fools net.Server into thinking that it's backed by a real 
 
 // handle. Use a noop function for ref() and unref() because the control 
 
 // channel is going to keep the worker alive anyway. 
 
 const handle = { close, listen, ref: noop, unref: noop }; 
 


 
 if (message.sockname) { 
 
   handle.getsockname = getsockname;  // TCP handles only. 
 
 } 
 


 
 assert(handles.has(key) === false); 
 
 handles.set(key, handle); 
 
 cb(0, handle); 
 
} 

此时的handle已经被重写,listen方法调用会返回0,不会再占用端口了。所以这样Node.js多个进程也只是一个进程监听端口而已

此时的cb还是net.js模块的setupListenHandle即 -  _listen2方法。

官方的注释:

Faux handle. Mimics a TCPWrap with just enough fidelity to get away 
 


 
仿句柄。以足够的保真度来模拟TCPWrap 

推荐阅读  点击标题可跳转 面试官问:Node 与底层之间如何执行异步 I/O 调用?

通过 Node.js 的 Cluster 模块源码,深入 PM2 原理

用 Docker 搭建你的第一个 Node 项目到服务器

觉得本文对你有帮助?请分享给更多人

关注「前端大全」加星标,提升前端技能

好文章,我在看❤️

回到顶部