nodejs-v0.10.0 初探

2013-03-17 00:37

nodejs-v0.10.0 初探

by snoopyxdy

at 2013-03-16 16:37:51

original http://snoopyxdy.blog.163.com/blog/static/6011744020132129942981

v0.10.0介绍
nodev10.0版本来了,对于性能官方肯定是说性能大大提高了
大致翻译如下:
很高兴一个新的稳定版本的node发布了。
这个分支带来了很多显著的改进,主要是api的调整,让我们更加容易使用并且向后兼容。
在之前的帖子中,我们介绍了stream2的api调整了,如果你还没有阅读他,请尽快阅读他。(剩下的就是说之前stream的api不是很完善,现在他们下决心要让这个api更好)

更重要的是stream作为node的核心,改变之后接口变得更加易用了。所以强烈建议使用stream2的api,对于node 0.8,你可以安装 readable-stream 包来支持。 

domain模块从实验级别提高到了不稳定级别(尼玛还是坑爹),使用了domain模块,我们不再依赖于 process.on('uncaughtException') 这样的错误控制了,如果你还没有使用domain来做错误处理,那你就要仔细检查哪些中间件和异步回调了(感觉有点威胁啊~)。

在0.8以及之前, process.nextTick() 会在当前事件循环结束时调用,这样通常是会在I/O开始前被调用的。所以很多项目都会使用process.nextTick()让它晚点做,而在I/O之前,看上去这样是正确的。事实上在大负载的I/O情况下,nextTick可能工作不正常,出现线程竞争情况。所以在v0.10.0版本 process.nextTick() 会在js代码执行完成后调用,而不是写入事件循环,可以说变同步了。应该尽量避免使用  process.nextTick() 来做递归,如果非要这么做请使用 setImmediate 来代替。

当事件循环闲置时,node会告诉V8开始做GC。事实上要找到正确的时间GC是非常困难的,当你选错时间GC将会耗费大量时间。在实践中,禁用IdleNotification会得到更好的效果。(他们现在依靠V8的GC了,因为他们觉得V8会很聪明的知道什么时候GC是最合适的,而不去手动的通知了)

再下面就是一些性能测试了,可以看出我们在响应大数据的字符串时,尽量使用buffer提高性能,v0.10.0在http方面要快于v0.8版本要快7%左右,读写文件性能也是提升显著。所以官方建议大家有条件的一定要升级到v0.10.0。

PS:当然现在还有使用古董级的0.6.x~

API改变:
大致意思是stream api的改变比较大,
1、增加了Readable, Writable, Duplex, and Transform的基类,我们可以直接从这些基类继承了
2、Readable streams 使用一个read方法,代替触发“data”的情况
3、增加一个"data"的监听器,或者调用“pause()” 和 “resume()” 会切换到旧stream模式
4、“data”的事件监听器永远不会错过第一个文件块,无论他们是否马上建立,pause 不再是咨询,而可以保证暂停
5、如果你不消耗数据,stream流永远是处于pause等待状态,而永远不会触发end事件
6、process.nextTick 会在当前事件结束调用,会在当前堆释放时执行,如果你打算递归的使用,请用setImmediate 代替
7、url.parse 将返回更多的信息如下:

// v0.8
> url.parse('http://foo')
{ protocol: 'http:',
  slashes: true,
  host: 'foo',
  hostname: 'foo',
  href: 'http://foo/',
  pathname: '/',
  path: '/' }

// 0.10
> url.parse('http://foo')
{ protocol: 'http:',
  slashes: true,
  auth: null,
  host: 'foo',
  port: null,
  hostname: 'foo',
  hash: null,
  search: null,
  query: null,
  pathname: '/',
  path: '/',
  href: 'http://foo/' }

8、domain模块对错误对象增加一个属性camelCase 代替snake_case
9、path.resolve 和 path.join 将会抛出异常,当传递的参数为空字符串时
10、dgram.Socket #bind() 会是一个异步方法,请在第二个参数增加回调函数
11、EventEmitter 基类的继承请使用新方法,以下方法将不会被支持:

function Child() {}
Child.prototype = new Parent(); // <-- NEVER EVER DO THIS!!

使用如下的代码代替上面的:(终于js有点像样点的继承了,蛋疼了多少年啊)

// Correct-Style Inheritance
function Child() {}
Child.prototype = Object.create(Parent.prototype, {
  constructor: {
    value: Child,
    enumerable: false,
    writable: true,
    configurable: true
  }
});
// "Gee that's a lot of lines! I wish there was a helper method!"
// There is.  Do this:
util.inherits(Child, Parent);


12、增加的一些api
12.1、stream增加 Readable, Writable, Duplex, and Transform 基类
12.2、crypto 的api 有stream接口的支持
12.3、process增加getgroups(), setgroups(), initgroups()
12.4、crypto增加getHashes() getCiphers()
12.5、http模块增加 response.headersSent 属性
12.6、增加‘removeListener’这个事件的触发,可以监听这个事件
12.6、增加setImmediate() 和 clearImmediate() 函数
12.7、字符串解码器增加   decoder.end() 函数

stream2:
最后我们来看一下stream2的一些api和新用法,首先我们要升级node到v0.10.0

# node -v
v0.10.0


stream是一个抽象类,它类继承自EventEmitter,例如http服务器就是一个stream,它是可读可写的,在早期的node版本中,stream类的接口是简单的,但是不够强大,可用性也不好。
1、不是当你调用read()函数,数据data将会马上接受,如果你想要做一些I/O来决定如何处理这些数据,你不得不建立一个buffer来存储那些数据。
2、pause()方法只是咨询,不能保证,可能当你调用pause()方法后,你还在接受data。

很多项目都无须调用data事件来监听,也不调用pause()和resume()方法。
我们看如下代码:

// WARNING!  BROKEN!
net.createServer(function(socket) {

  // we add an 'end' method, but never consume the data
  socket.on('end', function() {
    // It will never get here.
    socket.end('I got your message (but didnt read it)\n');
  });

}).listen(1337);

在v0.10.0版本之前,发送过来的数据将会丢失,因为我们没有做data事件的监听。在v0.10.0及以后,上面这段代码socket会一直处于暂停状态而接受数据

解决方法,代码如下:

// Workaround
net.createServer(function(socket) {

  socket.on('end', function() {
    socket.end('I got your message (but didnt read it)\n');
  });

  // start the flow of data, discarding it.
  socket.resume();

}).listen(1337);


创建一个只读stream类实例:
new stream.Readable([options])
options可配置
1、highWaterMark  {Number} ,这个数字表示read stream 缓冲区,默认16kb
2、encoding {String} 表示buffer的类型,默认为null,可以传入字符串的格式,比如utf-8
3、objectMode {Boolean} 表示是返回一个buffer对象还是值返回这个buffer对象的size n

readable._read(size)
这个方法不应该被直接调用,应该被内部的readable class 调用。所有readable的stream都必须提供一个_read的方法从数据源来获取数据。
size参数是咨询的,可能不是很正确,当使用tcp或者tls时会忽略这个参数。所以没有必要等设定的size的数据都到了,才去调用stream.push(chunk)方法

readable.push(chunk)
chunk {Buffer | null | String} 将块放入队列
return {Boolean}  如果为false则表示没有数据进行push
这个方法不是给readable消费者调用,而是给数据发送者调用,将数据push进队列里,push后,_read()方法就会从队列里读取数据了。
push方法明确的对readable队列内插入数据,当插入null时,则会发出数据发送完毕的信号
看一下简单的代码:

// source is an object with readStop() and readStart() methods,
// and an `ondata` member that gets called when it has data, and
// an `onend` member that gets called when the data is over.

var stream = new Readable(); //实例化

source.ondata = function(chunk) {
  // if push() returns false, then we need to stop reading from source
  if (!stream.push(chunk))
    source.readStop();
};

source.onend = function() {
  stream.push(null);
};

// _read will be called when the stream wants to pull more data in
// the advisory size argument is ignored in this case.
stream._read = function(n) {
  source.readStart();
};


readable.unshift(chunk)
chunk {Buffer | null | String} 将块从队列头部push
return {Boolean}  如果为false则表示没有数据进行unshift
看实例代码:
// A parser for a simple data protocol.

// The "header" is a JSON object, followed by 2 \n characters, and
// then a message body.
//
// Note: This can be done more simply as a Transform stream. See below.

function SimpleProtocol(source, options) { //定义一个SimpleProtocol 类
if (!(this instanceof SimpleProtocol))
return new SimpleProtocol(options);

Readable.call(this, options);//执行readable构造函数
this._inBody = false; //_inbody表示是否开始接受body
this._sawFirstCr = false; //表示是否看见第一个换行

// source is a readable stream, such as a socket or file
this._source = source; //将数据源赋值到this._source,source是一个readable stream

var self = this;
source.on('end', function() { //当数据源 end 事件触发,则 SimpleProtocol 实例push一个null,表示没有数据push到队列中
self.push(null);
});

// give it a kick whenever the source is readable
// read(0) will not consume any bytes
source.on('readable', function() { //read(0) 表示不消费任何字节
self.read(0);
});

this._rawHeader = []; //存放待格式化的头部的数组
this.header = null;
}

SimpleProtocol.prototype = Object.create( //SimpleProtocol继承readable类
Readable.prototype, { constructor: { value: SimpleProtocol }});

SimpleProtocol.prototype._read = function(n) {
if (!this._inBody) { //如果还没接受body,表示接受head
var chunk = this._source.read(); //从source中读取chunck

// if the source doesn't have data, we don't have data yet.

//如果source还没有数据,那我们就push空字符串
if (chunk === null)
return this.push('');

// check if the chunk has a \n\n
var split = -1;
for (var i = 0; i < chunk.length; i++) { //开始逐个解析chunk中的字符串,查看是否包含2个连续的/n
if (chunk[i] === 10) { // '\n' //如果发现是/n
if (this._sawFirstCr) { //如果上一个也是 /n 则这边为true
split = i; //找到了连续了/n 表示头部结束了,跳出循环,记录位置
break;
} else { //如果上一个字符不是 /n ,则把 _sawFirstCr 设为true
this._sawFirstCr = true;
}
} else { //如果本字符不是 /n 则 _sawFirstCr 设置为false
this._sawFirstCr = false;
}
}

if (split === -1) { //如果没有找到2个/n则继续等待,push空字符串,将chunk暂时保存
// still waiting for the \n\n
// stash the chunk, and try again.
this._rawHeader.push(chunk);
this.push('');
} else { //如果找到2个/n了,则表示以后就开始接受body
this._inBody = true; //将inbody设置为true,表示今后开始接受body
var h = chunk.slice(0, split); //将此次的chunk切分,头部放入待格式化数组
this._rawHeader.push(h);
var header = Buffer.concat(this._rawHeader).toString(); //然后将待格式化数组中的内容组合
try {
this.header = JSON.parse(header); //转化js对象
} catch (er) {
this.emit('error', new Error('invalid simple protocol data'));
return;
}
// now, because we got some extra data, unshift the rest
// back into the read queue so that our consumer will see it.
var b = chunk.slice(split);
this.unshift(b);//此时将chunk中取出的b也就是body部分,再塞回队列头部

// and let them know that we are done parsing the header.
this.emit('header', this.header); //触发header事件,并且把this.header作为参数传递过去
}
} else { //表示开始接受body
// from there on, just provide the data to our consumer.
// careful not to push(null), since that would indicate EOF.

//从这里开始,表示对我们的消费者也就是api使用者提供数据了,注意不要push(null),因为null表示结束
var chunk = this._source.read();
if (chunk) this.push(chunk);
}
};

// Usage:

var parser = new SimpleProtocol(source);
// Now parser is a readable stream that will emit 'header'
// with the parsed header data.

//这样parser就可以解析source的数据了,当header解析完毕,就会触发 header 事件


readable.wrap(stream)
主要是用来向前兼容的,如果你使用了旧的node库,还使用data事件触发和pause(),则可以使用wrap来创建一个readable stream,这个stream是用旧的的数据源,代码如下:

var OldReader = require('./old-api-module.js').OldReader;
var oreader = new OldReader;
var Readable = require('stream').Readable;
var myReader = new Readable().wrap(oreader);

myReader.on('readable', function() {
  myReader.read(); // etc.
});


Event: 'readable'
当数据准备好被消费时,这个事件就会被触发,当这个事件触发,调用read()方法去消费数据

Event: 'end'
当stream接受到eof关闭,则会触发此事件,指示没有数据会再发送,如果这个stream是可写的,则可能它还在被写入

Event: 'error'
触发error事件

Event: 'close'
当数据源关闭触发,并不是所有的数据源都会触发

readable.setEncoding(encoding)
设置data事件触发的参数是什么格式的,比如buffer或者utf-8

readable.read([size])
size {Number | null}  定义读取多少数据
Return: {Buffer | String | null} 返回buffer或者string
注意:这个方法是被stream消费者调用的
size的单位是bytes,如果不设置,则会返回内部buffer的整个内容。如果没有数据或者小于size的值,则null会返回,将来当返回更多了时候,会触发readable事件来让你消费数据
调用read(0)总是会返回null,并且会触发一次内部buffer的刷新,但是除此之外没有其他操作

readable.pipe(destination, [options])
destination {Writable Stream}    目标,一个可写入的stream
options {Object} Optional          设定参数
end {Boolean} Default=true       或者是一个布尔值表示是否关闭可写的stream,默认true的话会触发end事件,关闭stream
注意 process.stderr 和 process.stdout 除非进程退出,否则不会被关闭

readable.unpipe([destination])
destination {Writable Stream}  可选参数
撤销一个先前建立的pipe,如果参数destination 没有提供,则先前所有建立的pipe都被移除

readable.pause()
切换readable stream为旧模式,当data事件触发,则使用一个data事件监听,而不是通过read()方法消费buffer缓存的数据。
停止数据流,当stream处于paused状态时,没有data事件会被触发

readable.resume()
在执行pause()方法之后,回复接受发送来的数据流


Class: stream.Writable
一个可写的流,拥有如下的方法,成员和事件
注意:可写流是一个抽象类, _write(chunk, encoding, cb) 设计是用来扩展底层实现的

new stream.Writable([options])
options {Object}
highWaterMark {Number}   缓冲区,当写入开始后返回false
decodeStrings {Boolean}    是否将string改写为buffer写入,默认是
如果要扩展writable 类,确认需要正确调用它的构造函数


writable._write(chunk, encoding, callback)
chunk {Buffer | String}  写入的chunk一直是buffer,除非在实例化时显示声明 decodeStrings  为 false
encoding {String}    如果chunk是字符串,则设置他的编码,如果chunk是buffer则会忽略此属性,注意chunk一直是buffer,除非显示的将decodeStrings 设置为false
callback {Function}   此函数拥有一个可选的error参数,当处理完成提供的chunk会调用。

所有的可写流实现必须提供一个_write方法来将数据发送给底层实现。

writable.write(chunk, [encoding], [callback])
chunk {Buffer | String}   写入的数据
encoding {String} Optional.  如果写入的chunk是string,则encode默认为utf-8
callback {Function} Optional.  当chunk成功写入后调用
Returns {Boolean}  返回布尔值

将chunk写入stream,返回true表示数据已经写入底层,放回false表明buffer已经满了,数据将会在将来发送过去,drain事件会在buffer空了时候触发。什么时候write返回false,这取决于 highWaterMark  的设置

writable.end([chunk], [encoding], [callback])
chunk {Buffer | String} Optional final data to be written
encoding {String} Optional. If chunk is a string, then encoding defaults to 'utf8'
callback {Function} Optional. Called when the final chunk is successfully written.
调用这个方法表示最后的data写入了stream


Event: 'drain'
当stream的可写缓冲区为空时触发,当stream.write()返回false时监听它

#下面的比较简单不翻译了
Event: 'close'
Emitted when the underlying resource (for example, the backing file descriptor) has been closed. Not all streams will emit this.

Event: 'finish'
When end() is called and there are no more chunks to write, this event is emitted.

Event: 'pipe'
source {Readable Stream}
Emitted when the stream is passed to a readable stream's pipe method.

Event 'unpipe'
source {Readable Stream}
Emitted when a previously established pipe() is removed using the source Readable stream's unpipe() method.

Class: stream.Duplex
duplex stream是一个可读可写流,类似TCP socket链接

因为js没有多重继承,所以duplex类继承自readable类,然后寄生在writeable,因此他同时拥有_read()和_write()这2个方法,你可以扩展他们。

new stream.Duplex(options)
options {Object}  同上
allowHalfOpen {Boolean} Default=true. 如果设置为false,则stream会自动将readable stream关闭,当可写流关闭,反之亦然

Class: stream.Transform
transform流是一个duplex流,它对于input和output存在因果关系,比如zlibstream或者一个crypto。

对于传入的数据和传出的数据时不同大小的,不同的数据块大小和不同的到达时间。例如:一个hash 流仅会output一个chunk当input输入完毕。一个zlib流可能会将输入流处理为更大或更小

不用提供  _read()  和 _write() ,Transform 类必须提供 _transform() 方法,并且可能有一个可设定的提供_flush() 方法

new stream.Transform([options])
options {Object}  同上

transform._transform(chunk, encoding, callback)
chunk {Buffer | String}  写入的chunk一直是buffer,除非在实例化时显示声明 decodeStrings  为 false
encoding {String}如果chunk是字符串,则设置他的编码,如果chunk是buffer则会忽略此属性,注意chunk一直是buffer,除非显示的将decodeStrings 设置为false  此函数拥有一个可选的error参数,当处理完成提供的chunk会调用。

所有的Transform流实现必须提供一个_transform 方法来接受输入和输出

_transform 必须做任何此类特殊Transform 类的事情,处理字节的写入,然后将他们发送到可读部分的接口。做一些异步I/O,处理事情等等。
调用 transform.push(outputChunk) 0次或多次,用来生成从输入到输出chunk块,次数是根据你想要对这个chunk输出多少块。
仅会在当前chunk已经完全被消费时才会调用callback函数。注意,可能有些特殊的输入input块没有output结果。

transform._flush(callback)
callback {Function}  这个回调函数会有一个可选的error参数,当你执行完flushing任何其余的数据

在某些情况下,你的 transform 操作可能需要在流结束时,触发一个大一点的数据。比如zlib的压缩流会保存一些内部状态,所以它能达到最佳压缩状态,尽管到了最后,它还是需要一些数据,这样数据就会完整了。
在这情况,你需要执行 _flush 方法,它会被调用在非常后面,在所有数据都被消费掉,但是还没有出发end的信号之前。回调会在flush操作完成之后执行。

Example: SimpleProtocol parser
上面那个 simple protocol parser 可以被高级的 Transform stream class 更简单的实现出来。
在这个示例中,不用提供input作为参数,他将会通过pipe管道来解析,这将会更加通用的node 流方法。

function SimpleProtocol(options) {
  if (!(this instanceof SimpleProtocol))
    return new SimpleProtocol(options);

  Transform.call(this, options);
  this._inBody = false;
  this._sawFirstCr = false;
  this._rawHeader = [];
  this.header = null;
}

SimpleProtocol.prototype = Object.create(
  Transform.prototype, { constructor: { value: SimpleProtocol }});

SimpleProtocol.prototype._transform = function(chunk, encoding, done) {
  if (!this._inBody) {
    // check if the chunk has a \n\n
    var split = -1;
    for (var i = 0; i < chunk.length; i++) {
      if (chunk[i] === 10) { // '\n'
        if (this._sawFirstCr) {
          split = i;
          break;
        } else {
          this._sawFirstCr = true;
        }
      } else {
        this._sawFirstCr = false;
      }
    }

    if (split === -1) {
      // still waiting for the \n\n
      // stash the chunk, and try again.
      this._rawHeader.push(chunk);
    } else {
      this._inBody = true;
      var h = chunk.slice(0, split);
      this._rawHeader.push(h);
      var header = Buffer.concat(this._rawHeader).toString();
      try {
        this.header = JSON.parse(header);
      } catch (er) {
        this.emit('error', new Error('invalid simple protocol data'));
        return;
      }
      // and let them know that we are done parsing the header.
      this.emit('header', this.header);

      // now, because we got some extra data, emit this first.
      this.push(b);
    }
  } else {
    // from there on, just provide the data to our consumer as-is.
    this.push(b);
  }
  done();
};

var parser = new SimpleProtocol();
source.pipe(parser) //坑爹就是这里改变了

// Now parser is a readable stream that will emit 'header'
// with the parsed header data.


Class: stream.PassThrough
这是一个对transform的简单实现,他就是简单的将input转为output输出,它的目的是用来做示例和测试的,但是也有偶尔的情况它可以派上用场。


实际使用
stream2类介绍完毕了,我们看一下新的stream2类如何在实际使用,http发送请求,获取yahoo网站
var http = require("http")
var options = {
  hostname: 'www.yahoo.com',
  port: 80,
  path: '/',
  method: 'GET'
};
var req = http.request(options, function(res) {
  //console.log('STATUS: ' + res.statusCode);
  //console.log('HEADERS: ' + JSON.stringify(res.headers));
  res.setEncoding('utf8');
  res.on('readable', function () {
console.log(res.read())//这里就无须监听data事件然后拼字符串buffer之类了,直接监听可读事件,然后调用res.read()
  });
});
req.on('error', function(e) {
  console.log('problem with request: ' + e.message);
});
// write data to request body
req.write('data\n');
req.write('data\n');
req.end();