Call/CC与Node.js
by
at 2011-10-31 20:22:20
original http://blog.kghost.info/index.php/2011/10/callcc-and-node-js/
本文介绍一个使用大概10行代码, 实现一个类似老赵JSCEX[1]的工具
首先来看一下下面这段Node.js代码:
var copy = function(src, dst) { var fd_src = fs.open(src, "r"); var fd_dst = fs.open(dst, "w"); while (true) { var buffer_size = 4096; var buffer = new Buffer(buffer_size); var bytes_read = fs.read(fd_src, buffer, 0, buffer_size); if (bytes_read > 0) { var write_at = 0; while (write_at < bytes_read) { write_at += fs.write(fd_dst, buffer, write_at, bytes_read - write_at); } } else { break; } } };
这段看起来像是一个文件拷贝的函数, 但是这个函数在Node里面是无法运行的, 因为Node的所有函数调用都必须是异步的, 这里用到的fs.open, fs.read, fs.write都必须修改为异步调用, 修改后可运行的代码如下:
var copy = function(src, dst) { fs.open(src, "r", undefined, function(err, fd_src) { fs.open(dst, "w", undefined, function(err, fd_dst) { var copy_rec = function(position) { var buffer_size = 4096; var buffer = new Buffer(4096); fs.read(fd_src, buffer, 0, buffer_size, position, function(err, bytes_read) { if (bytes_read > 0) { var write_rec = function(write_at) { if (write_at < bytes_read) { fs.write(fd_dst, buffer, 0, bytes_read, position, function(err, written) { if (written > 0) { write_rec(write_at + written); } }); } else { copy_rec(position + bytes_read); } }; write_rec(0); } }); }; copy_rec(0); }); }); };
看到这里, 我想第一次接触Node的同学就已经要崩溃了, 一个简单的文件拷贝函数竟然需要嵌套4~5层函数回调, 那么更复杂的比如web servlet岂不是要嵌套10多层函数回调了?
难道没有方法让最上面的简洁明了的代码运行起来么? 下面就介绍一种方法, 不修改最开始那个简洁明了的拷贝函数, 让它直接跑起来, 并且不会阻塞整个JS引擎
首先分析一下同步调用与异步调用的区别
// sync var fd = fs.read (name, mode); // use fd // async fs.read (name, mode, function (fd) { // use fd });
仔细观察这两个调用, 会发现其实这就是一个CPS变换[2], 所以, 我们只需要做一个CPS变换的包装就可以实现同步的文件操作, 具体实现如下:
var efs = { open: function (name, mode) { return call_cc(function (continuation) { fs.open(name, mode, undefined, function(err, fd_src) { continuation(fd_src); }); }); }, read: function (fd, buffer, offset, length) { return call_cc(function (continuation) { fs.read(fd, buffer, offset, length, null, function(err, bytes_read) { continuation(bytes_read); }); }); }, write: function (fd, buffer, offset, length) { return call_cc(function (continuation) { fs.write(fd, buffer, offset, length, null, function(err, written) { continuation(written); }); }); } };
这个实现的核心就是call/cc函数[3], 如果了解lisp或者scheme的同学可能会非常熟悉这个函数, 这个函数作用是把当前正在运行的线程挂起, 然后执行call/cc本体, call/cc函数不会返回, 直到主动调用参数传进来的那个回调函数(本例中continuation变量绑定的函数), call/cc函数的返回值就是传给回调函数的参数. call/cc函数确实很难理解, 下面举例说明一下:
util.log("before call/cc"); var result = call_cc(function (continuation) { util.log("in call/cc before set timer"); setTimeout(function () { util.log("before callback"); continuation("hello world!"); util.log("after callback"); }, 1000); util.log("in call/cc after set timer"); }); util.log("after call/cc " + result);
这段代码的输出为:
31 Oct 13:01:33 - before call/cc // 运行call/cc函数主体 31 Oct 13:01:33 - in call/cc before set timer 31 Oct 13:01:33 - in call/cc after set timer // 休息1秒, 进入setTimeout的回调 31 Oct 13:01:34 - before callback // 进入continuation函数, 调用continuation函数就相当于call/cc函数返回了"hello world!", 从call/cc返回开始继续执行 31 Oct 13:01:34 - after call/cc hello world! // 执行setTimeout回调剩下部分 31 Oct 13:01:34 - after callback
现在我们已经使用CPS转换得到了"同步"的文件处理函数, 虽然说看起来是同步的, 但是内部使用的是call/cc实现, 实际上是不会阻塞JS引擎的, 其他回调仍然能正常执行. 有了这些函数的支持, 本文开头给出的那个文件拷贝函数就能够不作任何修改正常执行, 现在一切问题都已经解决了, 唯一剩下的问题就是call/cc函数的实现, 标准的node.js引擎是没有call/cc函数的, 这里我们使用node-fibers[4]实现call/cc函数.
var call_cc = function(cont) { var fiber = Fiber.current; cont(function (a) { fiber.run(a); }); return Fiber["yield"](); };
由于node-fiber的限制, 这个copy函数必须运行在fiber环境中, 完整代码如下:
var fs = require("fs"); require("fibers"); var call_cc = function(cont) { var fiber = Fiber.current; cont(function (a) { fiber.run(a); }); return Fiber["yield"](); }; var efs = { open: function (name, mode) { return call_cc(function (continuation) { fs.open(name, mode, undefined, function(err, fd_src) { continuation(fd_src); }); }); }, read: function (fd, buffer, offset, length) { return call_cc(function (continuation) { fs.read(fd, buffer, offset, length, null, function(err, bytes_read) { continuation(bytes_read); }); }); }, write: function (fd, buffer, offset, length) { return call_cc(function (continuation) { fs.write(fd, buffer, offset, length, null, function(err, written) { continuation(written); }); }); } }; var copy = function(src, dst) { var fd_src = efs.open(src, "r"); var fd_dst = efs.open(dst, "w"); while (true) { var buffer_size = 4096; var buffer = new Buffer(buffer_size); var bytes_read = efs.read(fd_src, buffer, 0, buffer_size); if (bytes_read > 0) { var write_at = 0; while (write_at < bytes_read) { write_at += efs.write(fd_dst, buffer, write_at, bytes_read - write_at); } } else { break; } } }; Fiber(function () { copy("src", "dst"); }).run();
最后一点空间, 我想简单评论一下老赵的JSCEX[1], 简单来说, 就是一点: 简直就是在重复发明轮子.
有关函数式编程, 在lisp界已经有40~50年的研究, 各个方面都已经有非常完善的体系, 当然这种异步同步函数之间互相转换早已经有体系化, 可证明的研究[5], 实现这样一个异步类库, 最好是从底层做起, 首先实现fiber[4]或者shift/reset[6]这样的底层函数, 然后再其之上, 构建一个异步框架. 像老赵这样, 一上来就搞JS再编译, 那是绕了十万八千里的远路, 不仅本身会引入bug, 也会给用户的调试造成很大困扰, 性能问题也无法保证.
- [1] JavaScript版本的AsyncEnumerator http://blog.zhaojie.me/2010/11/asynciterator-the-asyncenumerator-in-javascript.html ↩
- [2] Continuation-passing style http://en.wikipedia.org/wiki/Continuation-passing_style ↩
- [3] Call-with-current-continuation http://en.wikipedia.org/wiki/Call-with-current-continuation ↩
- [4] Fiber support for v8 and Node https://github.com/laverdet/node-fibers ↩
- [5] Olivier Danvy and Andrzej Filinski. 1990. Abstracting control. In Proceedings of the 1990 ACM conference on LISP and functional programming (LFP '90). ACM, New York, NY, USA, 151-160. DOI=10.1145/91556.91622 http://doi.acm.org/10.1145/91556.91622 ↩
- [6] Tiark Rompf, Ingo Maier, and Martin Odersky. 2009. Implementing first-class polymorphic delimited continuations by a type-directed selective CPS-transform. In Proceedings of the 14th ACM SIGPLAN international conference on Functional programming (ICFP '09). ACM, New York, NY, USA, 317-328. DOI=10.1145/1596550.1596596 http://doi.acm.org/10.1145/1596550.1596596 ↩