- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我很难使用 Spring Reactor Stream API(类似于 rxjava)在我的服务中构造一个响应对象,包装由两个下游服务提供的响应。
下面是accept()
我的 channel 消费者上的方法。为了保护无辜者,一些名称已更改..
@Overridepublic void accept(final NetChannel channel) { log.info("Consuming NetChannel of FullHttpRequest"); try { // Our initial Stream is a single HTTP request containing our XML document. Stream requestStream = channel.in(); requestStream.filter((x) -> {return true;}) .dispatchOn(dispatcher) .map(new FooRequestFunction()) // 1) .flatMap(new BarRequestStreamFunction()) // 2) .flatMap(new DownstreamRequestZipFunction()) // 3) .toList() // 4) .onComplete(new ResponsesConsumer(channel)); // 5) } catch (Exception e) { log.error("Exception thrown during Channel processing", e); }}
So, a FooRequest
wraps many BarRequests
, each of which has one associated Classify request, and one associated Validate request. We want to 1) Convert to a FooRequest
, 2) Convert the FooRequest
to a series of BarRequests
, 3) Run two downstream requests for each BarRequest
, 4) Aggregate all of our BarResponse
objects into an overall response, 5) send a response back out to the client.
The point at which I encounter problems is the toList()
method, which never seems to execute. Every time I've attempted something that involves a Promise
it always seems to break, and this has been no exception.
FooRequestFunction
, BarRequestStreamFunction
are fairly simple, and seem to run fine. Their method signatures are:
// FooRequestFunction
public FooRequest apply(final FullHttpRequest request);
和:
// BarRequestStreamFunction
public Stream<BarRequest> apply(FooRequest dsoRequests);
DownstreamRequestZipFunction
看起来像这样:
@Overridepublic Stream apply(BarRequest t) { Stream classifyRes = Streams .just(t) .flatMap(new ClassifyDownstreamRequestFunction()); Stream validateRes = Streams .just(t) .flatMap(new ValidateDownstreamRequestFunction()); return Streams.zip(classifyRes, validateRes, (tuple) -> { BarResponse response = new BarResponse(); response.setClassifyRes(tuple.getT1()); response.setValidateRes(tuple.getT2()); return response; });}
This seems to work fine, as long as both of the downstream request functions return a result.
Finally, the Consumer at the end of the chained calls has this signature:
// ResponsesConsumer
public void accept(Promise<List<BarResponse>> responses)
它所做的是 await() 响应 promise ,然后将所有这些响应聚合到单个 XML 文档中写回 channel 。我可以告诉执行永远不会达到这个方法,因为没有日志记录触发。这一切似乎都停在了 .toList() 上。
有谁知道为什么这个设置似乎执行 toList()
还是之后呢?
编辑:好的,我有更多信息。在为应用程序中的每个线程提供命名约定以使调试更容易之后,我可以看到“shared-1”,运行 accept() 方法的线程进入 WAITING 状态,然后停留在那里。这可能与底层 Dispatcher 是一个环形缓冲区调度程序这一事实有关,它是单线程的。
我修改了代码,使方法略有不同,并使用多线程调度程序,并避免使用 Promise
,但我仍然有一个状态,在该状态下,链式调用集的尾部将不会执行。见下文:
@Overridepublic void accept(final NetChannel channel) { log.info("Consuming NetChannel of FullHttpRequest"); try { // Our initial Stream is a single HTTP request containing our XML document. Stream requestStream = channel.in(); requestStream.filter((x) -> {return true;}) .dispatchOn(dispatcher) .map(new FooRequestFunction()) // 1) .flatMap(new BarRequestStreamFunction()) // 2) .flatMap(new DownstreamRequestZipFunction()) // 3) .reduce(new ArrayList(), (list,resp) -> {log.info("Reducing"); list.add(resp); return list;}) // 4) .consumeOn((x)->{log.info("Consume");}, (x)->{log.error("error");}, (x)->{log.info("Complete");}, dispatcher); // 5) } catch (Exception e) { log.error("Exception thrown during Channel processing", e); }}
在上面,我用对 reduce() 的调用替换了 toList() 并将所有内容折叠成一个 List<BarResponse>
.我可以看到这个执行和日志记录很好。但是,无论我对最后一次调用做什么,在尝试使用 consume()、consumeOn() 等之后,它都不会执行,也不会记录您在上面看到的最终调用。
在 VisualVM 中,我可以看到调度程序线程都在与阻塞队列关联的同一个对象监视器上等待 - 换句话说,它们都在等待工作到达。这就像完全忽略尾部 consumeOn() 调用。
我在这里做错了什么?我有什么不明白的?
编辑 2:鉴于约翰在下面的回复,我怀疑问题出在服务器设置上。可能仅用于 reactor 版本 2.0.0.M2,它在主 Application
中配置类如下:
@Bean public NetServer httpServer( final Environment env, final MetricRegistry metrics, final ChannelConsumer consumer) throws InterruptedException { NetServer server = new TcpServerSpec( NettyTcpServer.class) .env(env) .options( new NettyServerSocketOptions() .pipelineConfigurer((ChannelPipeline pipeline) -> pipeline .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(MAX_CONTENT_LENGTH)))) .consume(consumer) .get(); server.start().await(); return server; }
没有为此配置调度程序,它似乎在后台使用 LMAX 干扰器,而不是 NettyEventLoopDispatcher
.不清楚如何设置NettyEventLoopDispatcher
并将其用作替代调度程序。
最佳答案
consumeOn()
调用是多余的,因为您已经在那个 Dispatcher
上了(除非您的“真实”代码使用与此示例不同的东西)。当您编写输出时,无论如何都会在内部切换到 NettyEventLoopDispatcher
。
我做了一个快速检查以确保此流程在 TcpServer
之外工作并且按预期工作:
@Test
public void testStream() throws InterruptedException {
Stream<String> s1 = Streams.just("Hello World!");
s1.filter(s -> s.startsWith("Hello"))
.dispatchOn(Environment.sharedDispatcher())
.map(s -> s.toUpperCase())
.flatMap(s -> Streams.just(s, s))
.flatMap(s -> Streams.just(s, s))
.reduce(new ArrayList<>(), (l, s) -> {
l.add(s);
return l;
})
.consume(l -> {
System.out.println("thread: " + Thread.currentThread() + ", l: " + l);
});
Thread.sleep(500);
}
通过简单地注释掉 .dispatchOn()
调用来了解流程是否在 Netty 线程上运行会很有趣。
关于java - 对 Stream 的终端调用永远不会执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28975139/
使用 Guake 终端这个可自定义且强大的适合各种用户的工具快速访问你的终端。 Guake 终端:GNOME 桌面中自上而下终端 Guake 是一款为 GNOME
我是 python 的新手,正在尝试运行 python 2.7 script .获得了 python 2.7 的 pip 并从 mac 终端 shell 安装了 pyCrypto 的依赖项。 我想尝试
我正在寻找一种在 Swift (macOS) 中运行终端命令的方法。我遇到了this发布,但我似乎无法获得任何解决方案。我正在尝试从我的应用程序关闭我的 mac,就像您可以从终端执行的那样(osasc
我在 macOS 上使用 bash 终端。 用户名、计算机名和文件路径占据了大部分行,所以如果我写一个长命令,我会从一行开始,然后在下一行继续。 相反,我希望行光标从用户名和计算机名下方的下一行开始。
是否有一个变量或函数可以告诉我光标的实际位置? #!/usr/bin/env perl use warnings; use 5.012; use Term::ReadKey; use Term::Ca
如何在 Mac Os X(10.6.8) 上的 gnuplot 中启用 tikz 终端? 我有工作 tikz 的 latex 。现在我从 http://www.lua.org/ 安装了 lua并下载g
我正在学习一个名为 Starting a Django 1.4 Project the Right Way 的教程,其中提供了有关如何使用 virtualenv 和 virtualenvwrapper
我正在尝试用java编写一个unix终端模拟器。我有很多麻烦。我似乎无法更改程序的工作目录,因此“cd”等命令无法正常工作。我的问题是,如果我运行一个需要用户输入的命令,有什么方法可以将该输入发送到正
我在这方面完全是个新手(Mac leopard 中的终端),我希望能从网络上获得生命线,因为我确实碰壁了。 我想在终端中以 root 身份运行脚本。该脚本保存为扩展名为 .rtf 的文本文件。我已经插
尝试在我的 osascript 命令中包含引号 ' ' 时遇到了一个奇怪的问题。 如果我尝试转义一个正常的可转义字符,它就可以正常工作。示例: osascript -e 'tell app "Find
我正在制作一个控制台 Java 应用程序,您可以在其中输入控制台命令,例如 Macintosh/Ubuntu/Windows 命令提示符上的终端,然后将其输出到日志。 我想知道,在执行系统/控制台命令
在终端中输入 mysql 命令并按回车键会换行。 但有时当我犯错时,即使用分号结束语句也无法退出此模式。 Ctrl + c 退出mysql。我怎样才能退出插入模式? 最佳答案 你必须用 ';' 结束
我正在尝试编写一个 C 代码来打开 xeyes 应用程序,然后那些眼睛在特定的时间段内不断改变其颜色.. 我尝试通过执行具有一种中心颜色的 xeyes、添加 3 秒的延迟、终止进程并在循环内打开具有另
是否有一种语法允许我在 System.out.println() 行 的同一行中读取用户的输入? 例子: What is your name?:(<-- Output) Jack (<-- In
我有一个 Wordpress 上传文件夹,该文件夹使用子文件夹构建了几个月。 wolfr2:uploads wolfr$ tree . . |-- 2007 | |-- 08 | | |-
如何从 mac 终端使用 sqlite3 找出表的列名?我忘记了我给这些列起的名字,我也不知道这些名字是怎么来的。谢谢! 最佳答案 来自 http://www.sqlite.org/sqlite.ht
我需要我的终端发送一个未使用的控制字符或转义序列,它在所有层都没有效果:被shell(bash,…)忽略,被行编辑器(readline,…)忽略,被所有应用程序(vim,less,mutt,…)忽略。
我做了一个文本编辑器,我想把它移植到 Linux 上,这样我就可以通过 SSH 远程使用它。我不太了解 Linux 终端,所以也许我遗漏了一些明显的东西,因为我简直不敢相信在 2013 年远程终端仍然
我最近想放一个 java 类文件供下载,人们可以在终端中运行它。这是一个 Minecraft 命令生成器,因此下载它的人不一定具有最大的心智能力(当然,我指的是 8 岁的 child ,他们不知道自己
我有一个文件“test.txt”,里面有一个数字列表,就像这样 1 3 4 2 3 40 312 53 243 321 423 ...etc 我还有一个可执行文件,它是一种排序算法,例如 hea
我是一名优秀的程序员,十分优秀!