Play 框架手册(10) – 在 HTTP 下进行异步编程

本节将介绍在 play 里如何进行异步处理以实现典型的长轮询(long-polling) 、 流以及其他 Comet-style 类型的应用程序以支持上千个同时发生的连接。

10.1. 暂停 http 请求

Play使用的是短小的请求, 它使用固定的线程池来处理 http连接者的查询请求。

为了达到最佳效果,线程池应该尽可能小。我们通常使用的最适合的数字就是处 理器数量+1 来设置默认池大小。

也就是说如果请求的处理时间很长的话(比如等待一个长时的计算),这个请求 就会耗尽线程池,使应用程序的响应变得很慢。当然可以扩大线程池,但这样会 浪费资源,而且线程池不可能是无限的。

考虑一下聊天室应用,浏览器发送一个阻塞式的 http 请求用于等待显示新的信 息。这个请求将会非常这长(比如几分钟),并且会打破线程池。如果计划允许 100 个用户同时连接聊天室程序,那么我们就需要提供至少 100 个线程,当然, 这是可行的。但如果是 1000 个,或 100000 个呢?

在这种情况下,play 允许你暂停一个请求。http 请求将停留在连接状态,但请 求执行将被从线程池中弹出,过会再试。你即可在固定等待的时间内告诉 play  去测试一下该请求,也可等待一个允许值变为可能情况。

小提示: 请看一下真实的示例 samples-and-tests/chat 比如, 下面这个动作将要加载一个非常耗时的 job 并且一直等到 job 完成返回结 果:

public static void generatePDF(Long reportId) {
    Promise<InputStream> pdf = new ReportAsPDFJob(report).now();
    InputStream pdfStream = await(pdf);
    renderBinary(pdfStream);
}

这里, 我们使用 await(„)来让 Play 暂停请求,直到 Promise<InputStream> 值 返回 redeemed。

Continuations 

因为框架需要收回线程以便为其他请求服务,因此 play 就必须暂停你的代码。

在之前的 play 版本中 await(„) 等价于 waitFor(„),用于暂停你的 action, 之后又重新调用。

为了易于约定我们介绍的异步代码,Continuations 允许代码被暂停和被透明恢 复,因此书写如下代码是非常必要的:

public static void computeSomething() {
    Promise<String> delayedResult = veryLongComputation(„);
    String result = await(delayedResult);
    render(result);
}

在这里,事实上你的代码将分成两步用两个不同的线程来执行。但这些代码对你 来说是完全透明的。

使用 await(„)和 continuations,你可能需要写一个循环:

public static void loopWithoutBlocking() {
    for(int i=0; i<=10; i++) {
         Logger.info(i);
         await("1s");
    }
    renderText("Loop finished");
}

当使用一个线程处理这个请求时,在默认的开发模式下,Play 能够同时为不同 的请求运行这些循环。

更实际的示例是异步从远程 URL 获取内容。 下面将并行运行三个远程 http 请求,每个都调用 play.libs.WS.WSRequest.getAsync()方法来执行一个 GET 请求 , 异步返回一个 play.libs.F.Promise。 action 方法通过调用三个 Promise 组合实 例的 await(„)方法来暂停进入的 http 请求。当三个远程调用返回结果后,线 程将自动恢复并且渲染 response。

public class AsyncTest extends Controller {

  public static void remoteData() {
    F.Promise<WS.HttpResponse> r1 = WS.url("http://example.org/1").getAsync();
    F.Promise<WS.HttpResponse> r2 = WS.url("http://example.org/2").getAsync();
    F.Promise<WS.HttpResponse> r3 = WS.url("http://example.org/3").getAsync();

    F.Promise<List<WS.HttpResponse>> promises = F.Promise.waitAll(r1, r2, r3);

    //暂停处理,直到所有三个远程调用结束
    List<WS.HttpResponse> httpResponses = await(promises);

    render(httpResponses);
  }
}

回调 Callbacks 

还可以使用回调实现上面的示例。这次,await() 方法包含了  play.libs.F.Action 实现,当三个远程调用结束后就调用这个回调方法。

public class AsyncTest extends Controller {

  public static void remoteData() {
    F.Promise<WS.HttpResponse> r1 = WS.url("http://example.org/1").getAsync();
    F.Promise<WS.HttpResponse> r2 = WS.url("http://example.org/2").getAsync();
    F.Promise<WS.HttpResponse> r3 = WS.url("http://example.org/3").getAsync();

    F.Promise<List<WS.HttpResponse>> promises = F.Promise.waitAll(r1, r2, r3);

    //暂停处理,直到所有三个远程调用结束
    await(promises, new F.Action<List<WS.HttpResponse>>() {
      public void invoke(List<WS.HttpResponse> httpResponses) {
        render(httpResponses);
      }
    });
  }
}

10.2. HTTP response 流  streaming

既然不用中心请求也可执行循环, 你或许会希望向浏览器发送结果变量的部分数 据(不是全部)。这就是 Content-Type:Chunked HTTP response 大量 http response 类型。它允许你多次使用多个块来发送 http response。浏览器将实时 接收这些块。

使用 await(„)和 continuations,就可以实现这个功能:

public static void generateLargeCSV() {
    CSVGenerator generator = new CSVGenerator();
    response.contentType = "text/csv";
    while(generator.hasMoreData()) {
          String someCsvData = await(generator.nextDataChunk());
          response.writeChunk(someCsvData);
    }
}

即使 CSV 生成需要 1 个小时,play 也能同时使用单个线程处理多个请求,一旦 为客户端的数据生成好后,play 就会向客户端发送。

10.3. 使用 WebSockets

WebSockets 是一种在浏览器和应用程序间实现双向通信的途径。在浏览器端使 用 “ws://” url:

new Socket("ws://localhost:9000/helloSocket?name=Guillaume")

在 play 端需要声明一条 WS 路由:

WS   /helloSocket            MyWebSocket.hello

MyWebSocket 是一个 WebSocketController。一个 WebSocket 控制器和一个标准 的 http 控制器很相似,但处理的内容不同:

  • 它有一个请求对象,但没有 response 对象
  • 它有一个可访问的 session,但是只读的
  • 它没有 renderArgs, routeArgs 和 flash 域
  • 它只能从路由模式和 QueryString 里读取 params
  • 它拥有两个通信通道:一进一出

当客户连接到 ws://localhost:9000/helloSocket 套接字时, Play 将调用 MyWebSocket.hello 动作方法。一旦 MyWebSocket.hello 动作方法存在,套接字 就会被关闭。

因此一个非常基础的套接字示例应该是这个样子:

public class MyWebSocket extends WebSocketController {

    public static void hello(String name) {
        outbound.send("Hello %s!", name);
    }
}

在这里, 当客户端连接到 socket 时, 它将接收到‘Hello Guillaume’消息, play  随后将关闭这个 socket。

当然,通常情况下你不需要立即关闭 socket,用 await(„)和 continuations 也能实现。

比如一个基础的 Echo 服务器:

public class MyWebSocket extends WebSocketController {

    public static void echo() {
        while(inbound.isOpen()) {
             WebSocketEvent e = await(inbound.nextEvent());
             if(e instanceof WebSocketFrame) {
                  WebSocketFrame frame = (WebSocketFrame)e;
                  if(!e.isBinary) {
                      if(frame.textData.equals("quit")) {
                          outbound.send("Bye!");
                          disconnect();
                      } else {
                          outbound.send("Echo: %s", frame.textData);
                      }
                  }
             }
             if(e instanceof WebSocketClose) {
                 Logger.info("Socket closed!");
             }
        }
    }

}

在上面的示例里,嵌套的‘if’和‘cast’很乏味,而且容易出错。即使这个简 单示例也不容易处理。更复杂的多个联合线程的情况下将会有更多的事件类型, 这将是一个恶梦。

这就是为什么我们要向你介绍在 play.libs.F 库里的基础的模式匹配的原因。

现在我们重写一下 echo 示例:

public static void echo() {
    while(inbound.isOpen()) {
         WebSocketEvent e = await(inbound.nextEvent());

         for(String quit: TextFrame.and(Equals("quit")).match(e)) {
             outbound.send("Bye!");
             disconnect();
         }

         for(String msg: TextFrame.match(e)) {
             outbound.send("Echo: %s", frame.textData);
         }

         for(WebSocketClose closed: SocketClosed.match(e)) {
             Logger.info("Socket closed!");
         }
    }
}

 


前一篇:
后一篇:

发表评论