从事件驱动到observable的异步编程——PubSub+Promise+Rx的JS事件库

2011-09-26 07:35

从事件驱动到observable的异步编程——PubSub+Promise+Rx的JS事件库

by Dexter.Yy

at 2011-09-25 23:35:39

original http://www.limboy.com/2011/09/25/pubsub-plus-promise-plus-rx/

你上当叻,虽然从外面看标题很有气势,传达出一种宏大叙事的赶脚,其实我只是刚刚把一个阿尔法城的JS模块提交到github,想顺便介绍一下,但我连API文档都懒得写,就别指望能深入浅出的讲一遍来龙去脉了⋯⋯

所以就直接帖几个前置阅读的链接罢!

这些潮流的外部起源:(技术也有外源论/exogenesis⋯⋯)
Twisted(Python的事件驱动异步引擎)里的Deferred模式
微软推崇的Reactive Extensions (Rx)

虽然我是微软黑但微软网站上的这两篇推介文章不错:
Understanding the Publish/Subscribe Pattern for Greater JavaScript Scalability
Asynchronous Programming in JavaScript with “Promises”
应该都有人翻译了,比如这个:infoQ: JavaScript异步编程的Promise模式

jQuery早就跟微软一个鼻孔出气了:
http://api.jquery.com/category/deferred-object/

CommonJS的Promises提案,照例又分了好多种ABCD神马的:
http://wiki.commonjs.org/wiki/Promises

假如你愿意这里还有一篇paper⋯⋯

阿尔法城的客户端程序里有一个叫作event的模块提供了以上提到的PubSub模式、Promise模式和部分Rx模式,可以算是OzJS的核心module。

就像名字一样,它的初衷是一个最基础最简洁的消息事件库,类似nodejs的EventEmitter。在项目实践中,我很早就注意到可以用统一的事件机制实现Twisted风格的API,为此需要能随时提取事件主题(本质上就是promise对象),后来又根据实际需求加入了能表示状态转移的触发器(enable/resolve)和“一次性”的侦听器(wait/then),最后实现了同时依赖多个异步事件(或promise)的语法工具,包括并发事件(when)和有先后顺序的事件流(follow和end)。所以这个模块不是基于自顶向下的设计,而是在逐步的实践、hack和验证中发展出来的,上面提到的各种模式词汇都是“事后美化”,我觉得大多数“设计模式”也是这样——是对实践方法的归纳和描述,而不是在实践中套用的“新技术”。


开始帖使用范例~

把Event实例单独定义为模块,承担应用各模块之间的消息传递:

  1. oz.def("notify", ["event"], function(Event){
  2.     return Event(); // 以下例子里省略def/require声明,继续沿用notify和Event这两个局部变量名
  3. });

为基础类生成独立的事件命名空间,不依赖应用级的全局事件:

  1. Dialog.prototype = {
  2.     init: function(opt){
  3.         this.event = Event();
  4.     },
  5.     update: function() {
  6.         this.updateSize();
  7.         this.updatePosition();
  8.         this.event.fire("update", [this]);
  9.         return this;
  10.     },

监听消息和解除监听:

  1. notify.bind("msg:A", function(msg){
  2.     a = msg;
  3.     notify.unbind("msg:A", arguments.callee);
  4. });

发送消息:

  1. setTimeout(function(){
  2.     notify.fire("msg:A", ["hey jude"]);
  3. }, 1000);

状态转移:

  1. $("#button1").click(function(e){
  2.     notify.resolve("button1:clicked", [this]);
  3.     notify.bind("button1:clicked", function(button){
  4.         // 按钮1已经点击过,所以立刻执行
  5.         button.style.color = 'black';
  6.     });
  7. });
  8. notify.bind("button1:clicked", function(button){
  9.     // 等待按钮1点击之后再执行
  10.     button.style.color = 'red';
  11. });

异步回调:

  1. var data = {
  2.     load: function(url){
  3.         $.getJSON(url, function(json){
  4.             if (json)
  5.                 notify.resolve("data:" + url, [json]);
  6.             else
  7.                 notify.reject("data:" + url);
  8.         });
  9.         return notify.promise("data:" + url);
  10.     }
  11. };
  12. data.load("jsonp_data_1.js").then(function(json){
  13.     // json callback
  14. }, function(){
  15.     // json error
  16. });

也可以用自己的promise对象:

  1. var promise = Event.Promise();
  2. $.ajax({
  3.     url: "jsonp_data_1.js",
  4.     success: function(json){
  5.         promise.resolve(json);
  6.         promise.fire("json loaded");
  7.     },
  8.     error: function(){
  9.         promise.reject();
  10.         promise.error("json error");
  11.     }
  12. });
  13. // fire和error都会执行bind的参数,resolve执行then和bind,所以bind的参数会被执行2次
  14. // 如果ajax请求在之前已经返回,则只有then或fail的参数会被执行(因为他们监听的是“状态改变”)
  15. promise.bind(function(){}).then(function(){}).fail(function(){});

事件流:

  1. notify.promise("data:jsonp_data_1.js").then(function(json){
  2.     setTimeout(function(){
  3.         notify.resolve("delay:1000", [+new Date(), json]);
  4.     }, 1000);
  5.     return notify.promise("delay:1000");
  6. }).follow().then(function(time, json){
  7.     setTimeout(function(){
  8.         console.log("[数据在3秒前加载成功]", json);
  9.     }, 2000);
  10. }).end().fail(function(msg){
  11.     return notify.promise("data:error").resolve([msg]);
  12. }).follow().then(function(){
  13.     console.log("[数据加载失败]", msg);
  14. });

避免多层的回调嵌套(“callback hell”):

  1. var fs = require("fs");
  2. fs.readFile(input, 'utf-8').then(function(err, data){
  3.     var beautifuldata = js_beautify(data, options);
  4.     // 需要修改readFile和writeFile传出promise对象
  5.     return fs.writeFile(output, beautifuldata);
  6. }).follow().then(function(err){
  7.     if (err)
  8.         throw err;
  9.     console.log('Success!');
  10. });

依赖多个并发事件:

  1. notify.when("msg:A", "msg:B", "jsonp:A", "jsonp:B") // when传出新的promise对象
  2.     .some(3) // 如果不调用some或any,默认为全部事件完成后再触发resolve
  3.     .then(function(){ // 已经取到3/4的数据,参数顺序跟when的参数顺序一样
  4.         console.warn("recieve 3/4", arguments);
  5.     });

静态方法Event.when接受promise参数,可以写出更复杂的依赖关系:

  1. Event.when(
  2.     notify.when("msg:A", "msg:B"),
  3.     notify.when("click:btn1", "clicked:btn2").any()
  4. ).then(function(args1, args2){
  5.     // 相当于:"msg:A" && "msg:B" && ( "click:btn1" || "clicked:btn2" )
  6.     console.warn("recieve all messages, click one button", arguments);
  7. });



测试demo:https://github.com/dexteryy/OzJS/blob/master/tests/test_event.html
可以在console里观察执行顺序⋯⋯

从这个测试页可以看出我连单元测试都懒得写⋯⋯