Call/CC与Node.js

2011-11-01 04:22

Call/CC与Node.js

by

at 2011-10-31 20:22:20

original http://blog.kghost.info/index.php/2011/10/callcc-and-node-js/

本文介绍一个使用大概10行代码, 实现一个类似老赵JSCEX[1]的工具

首先来看一下下面这段Node.js代码:

var copy = function(src, dst) {
    var fd_src = fs.open(src, "r");
    var fd_dst = fs.open(dst, "w");
    while (true) {
        var buffer_size = 4096;
        var buffer = new Buffer(buffer_size);
        var bytes_read = fs.read(fd_src, buffer, 0, buffer_size);
        if (bytes_read > 0) {
            var write_at = 0;
            while (write_at < bytes_read) {
                write_at += fs.write(fd_dst, buffer, write_at, bytes_read - write_at);
            }
        } else {
            break;
        }
    }
};


这段看起来像是一个文件拷贝的函数, 但是这个函数在Node里面是无法运行的, 因为Node的所有函数调用都必须是异步的, 这里用到的fs.open, fs.read, fs.write都必须修改为异步调用, 修改后可运行的代码如下:

var copy = function(src, dst) {
    fs.open(src, "r", undefined, function(err, fd_src) {
        fs.open(dst, "w", undefined, function(err, fd_dst) {
            var copy_rec = function(position) {
                var buffer_size = 4096;
                var buffer = new Buffer(4096);
                fs.read(fd_src, buffer, 0, buffer_size, position, function(err, bytes_read) {
                    if (bytes_read > 0) {
                        var write_rec = function(write_at) {
                            if (write_at < bytes_read) {
                                fs.write(fd_dst, buffer, 0, bytes_read, position, function(err, written) {
                                    if (written > 0) {
                                        write_rec(write_at + written);
                                    }
                                });
                            } else {
                                copy_rec(position + bytes_read);
                            }
                        };
                        write_rec(0);
                    }
                });
            };
            copy_rec(0);
        });
    });
};


看到这里, 我想第一次接触Node的同学就已经要崩溃了, 一个简单的文件拷贝函数竟然需要嵌套4~5层函数回调, 那么更复杂的比如web servlet岂不是要嵌套10多层函数回调了?

难道没有方法让最上面的简洁明了的代码运行起来么? 下面就介绍一种方法, 不修改最开始那个简洁明了的拷贝函数, 让它直接跑起来, 并且不会阻塞整个JS引擎

首先分析一下同步调用与异步调用的区别

// sync
var fd = fs.read (name, mode);
// use fd


// async
fs.read (name, mode, function (fd) {
    // use fd
});


仔细观察这两个调用, 会发现其实这就是一个CPS变换[2], 所以, 我们只需要做一个CPS变换的包装就可以实现同步的文件操作, 具体实现如下:

var efs = {
    open: function (name, mode) {
        return call_cc(function (continuation) {
            fs.open(name, mode, undefined, function(err, fd_src) {
                continuation(fd_src);
            });
        });
    },
    read: function (fd, buffer, offset, length) {
        return call_cc(function (continuation) {
            fs.read(fd, buffer, offset, length, null, function(err, bytes_read) {
                continuation(bytes_read);
            });
        });
    },
    write: function (fd, buffer, offset, length) {
        return call_cc(function (continuation) {
            fs.write(fd, buffer, offset, length, null, function(err, written) {
                continuation(written);
            });
        });
    }
};


这个实现的核心就是call/cc函数[3], 如果了解lisp或者scheme的同学可能会非常熟悉这个函数, 这个函数作用是把当前正在运行的线程挂起, 然后执行call/cc本体, call/cc函数不会返回, 直到主动调用参数传进来的那个回调函数(本例中continuation变量绑定的函数), call/cc函数的返回值就是传给回调函数的参数. call/cc函数确实很难理解, 下面举例说明一下:

util.log("before call/cc");
var result = call_cc(function (continuation) {
    util.log("in call/cc before set timer");
    setTimeout(function () {
        util.log("before callback");
        continuation("hello world!");
        util.log("after callback");
    }, 1000);
    util.log("in call/cc after set timer");
});
util.log("after call/cc " + result);


这段代码的输出为:

31 Oct 13:01:33 - before call/cc
// 运行call/cc函数主体
31 Oct 13:01:33 - in call/cc before set timer
31 Oct 13:01:33 - in call/cc after set timer
// 休息1秒, 进入setTimeout的回调
31 Oct 13:01:34 - before callback
// 进入continuation函数, 调用continuation函数就相当于call/cc函数返回了"hello world!", 从call/cc返回开始继续执行
31 Oct 13:01:34 - after call/cc hello world!
// 执行setTimeout回调剩下部分
31 Oct 13:01:34 - after callback


现在我们已经使用CPS转换得到了"同步"的文件处理函数, 虽然说看起来是同步的, 但是内部使用的是call/cc实现, 实际上是不会阻塞JS引擎的, 其他回调仍然能正常执行. 有了这些函数的支持, 本文开头给出的那个文件拷贝函数就能够不作任何修改正常执行, 现在一切问题都已经解决了, 唯一剩下的问题就是call/cc函数的实现, 标准的node.js引擎是没有call/cc函数的, 这里我们使用node-fibers[4]实现call/cc函数.

var call_cc = function(cont) {
    var fiber = Fiber.current;
    cont(function (a) {
        fiber.run(a);
    });
    return Fiber["yield"]();
};


由于node-fiber的限制, 这个copy函数必须运行在fiber环境中, 完整代码如下:

var fs = require("fs");
require("fibers");

var call_cc = function(cont) {
    var fiber = Fiber.current;
    cont(function (a) {
        fiber.run(a);
    });
    return Fiber["yield"]();
};

var efs = {
    open: function (name, mode) {
        return call_cc(function (continuation) {
            fs.open(name, mode, undefined, function(err, fd_src) {
                continuation(fd_src);
            });
        });
    },
    read: function (fd, buffer, offset, length) {
        return call_cc(function (continuation) {
            fs.read(fd, buffer, offset, length, null, function(err, bytes_read) {
                continuation(bytes_read);
            });
        });
    },
    write: function (fd, buffer, offset, length) {
        return call_cc(function (continuation) {
            fs.write(fd, buffer, offset, length, null, function(err, written) {
                continuation(written);
            });
        });
    }
};

var copy = function(src, dst) {
    var fd_src = efs.open(src, "r");
    var fd_dst = efs.open(dst, "w");
    while (true) {
        var buffer_size = 4096;
        var buffer = new Buffer(buffer_size);
        var bytes_read = efs.read(fd_src, buffer, 0, buffer_size);
        if (bytes_read > 0) {
            var write_at = 0;
            while (write_at < bytes_read) {
                write_at += efs.write(fd_dst, buffer, write_at, bytes_read - write_at);
            }
        } else {
            break;
        }
    }
};

Fiber(function () {
    copy("src", "dst");
}).run();



最后一点空间, 我想简单评论一下老赵的JSCEX[1], 简单来说, 就是一点: 简直就是在重复发明轮子.
有关函数式编程, 在lisp界已经有40~50年的研究, 各个方面都已经有非常完善的体系, 当然这种异步同步函数之间互相转换早已经有体系化, 可证明的研究[5], 实现这样一个异步类库, 最好是从底层做起, 首先实现fiber[4]或者shift/reset[6]这样的底层函数, 然后再其之上, 构建一个异步框架. 像老赵这样, 一上来就搞JS再编译, 那是绕了十万八千里的远路, 不仅本身会引入bug, 也会给用户的调试造成很大困扰, 性能问题也无法保证.

  1. [1] JavaScript版本的AsyncEnumerator http://blog.zhaojie.me/2010/11/asynciterator-the-asyncenumerator-in-javascript.html
  2. [2] Continuation-passing style http://en.wikipedia.org/wiki/Continuation-passing_style
  3. [3] Call-with-current-continuation http://en.wikipedia.org/wiki/Call-with-current-continuation
  4. [4] Fiber support for v8 and Node https://github.com/laverdet/node-fibers
  5. [5] Olivier Danvy and Andrzej Filinski. 1990. Abstracting control. In Proceedings of the 1990 ACM conference on LISP and functional programming (LFP '90). ACM, New York, NY, USA, 151-160. DOI=10.1145/91556.91622 http://doi.acm.org/10.1145/91556.91622
  6. [6] Tiark Rompf, Ingo Maier, and Martin Odersky. 2009. Implementing first-class polymorphic delimited continuations by a type-directed selective CPS-transform. In Proceedings of the 14th ACM SIGPLAN international conference on Functional programming (ICFP '09). ACM, New York, NY, USA, 317-328. DOI=10.1145/1596550.1596596 http://doi.acm.org/10.1145/1596550.1596596