gpt4 book ai didi

java - Apache Storm 2.1.0 本地 DRPC 不会返回任何响应,尽管最后一个 Bolt 已将元组发送到收集器

转载 作者:行者123 更新时间:2023-12-01 18:44:45 26 4
gpt4 key购买 nike

我在尝试运行包含一个 Bolt 的 DRPC 拓扑并通过本地集群查询它时遇到问题。使用IntelliJ调试后,bolt确实被执行了,但是JCQueue在bolt执行完之后一直陷入无限循环,直到超时发送到服务器。

以下是用于构建拓扑生成器的代码:

public static LinearDRPCTopologyBuilder createBuilder()
{
var bolt = new MRedisLookupBolt(createRedisConfiguration(), new RedisTurnoverMapper());
var builder = new LinearDRPCTopologyBuilder("sales");
builder.addBolt(bolt, 1).localOrShuffleGrouping();
return builder;
}

MRedisLookupBolt 只是 IBasicBolt 对 Jedis 执行 hget 命令的一个非常简单的实现。 MRedisLookupBolt 的 execute 方法只是发出一个 Values 实例,其中包含两个声明如下的字段的值:

declarer.declare(new Fields("id", "Value"));

拓扑是在单元测试中构建和查询的,如下所示:

Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(1);

try(LocalDRPC drpc = new LocalDRPC())
{
LocalCluster cluster = new LocalCluster();
var builder = BasicRedisRPCTopology.createBuilder();
LocalCluster.LocalTopology topo = cluster.submitTopology(
"Sales-fetch", conf, builder.createLocalTopology(drpc));
var result = drpc.execute("sales", "XXXXX");
System.out.println("################ Result: " + result);
}
catch (Exception e)
{
e.printStackTrace();
}

在阅读日志时,我确信数据是红色的,并且所有内容都已发出 enter image description here

但最后,我通过我的测试方法轻轻地打印出了这个堆栈跟踪。当然,不会为结果变量分配任何值,并且进程永远不会到达最后的打印指令:

enter image description here

我在这里缺少一些东西。我的理解是:BoltExecutor 使用 JCQueue 来检索要执行的 Bolt 的 id,尽管只有一个参数发送到本地 DRPC 并且只有一个 Bolt 声明到拓扑中,但它永远不会结束。我已经尝试向拓扑添加更多 bolt 或更改用于创建它的构建器实现,但没有成功。

最佳答案

我使用 Apache Storm 2.1.0 找到了适合我的用例的解决方案。

似乎按照文档的建议调用本地集群的 submitTopology 方法并没有在 2.1.0 版本中使用 LinearDRPCTopologyBuilder 构建正确地结束执行器拓扑结构。

通过仔细查看源代码,可以了解如何将 LinearDRPCTopologyBuilder 逻辑直接应用到 TopologyBuilder

以下是应用于 createBuilder 方法的更改:

    public static TopologyBuilder createBuilder(ILocalDRPC localDRPC)
{
var spout = Optional.ofNullable(localDRPC)
.map(drpc -> new DRPCSpout("sales", drpc))
.orElse(new DRPCSpout("sales"));
var bolt = new MRedisLookupBolt(createRedisConfiguration(), new RedisTurnoverMapper());
var builder = new TopologyBuilder();
builder.setSpout("drpc", spout);
builder.setBolt("redisLookup", bolt, 1)
.shuffleGrouping("drpc");
builder.setBolt("return", new ReturnResults())
.shuffleGrouping("redisLookup");
return builder;
}

这是一个执行示例:

        Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(1);

try(LocalDRPC drpc = new LocalDRPC())
{
LocalCluster cluster = new LocalCluster();
var builder = BasicRedisRPCTopology.createBuilder(drpc);
cluster.submitTopology("Sales-fetch", conf, builder.createTopology());
var result = drpc.execute("sales", "XXXXX");
System.out.println("################ Result: " + result);
}
catch (Exception e)
{
e.printStackTrace();
}

不幸的是,该解决方案不允许使用LinearDRPCTopologyBuilder的所有嵌入式工具,并且意味着“手动”构建所有拓扑流。有必要将映射器行为更改为,因为字段未按照与以前相同的顺序公开。

关于java - Apache Storm 2.1.0 本地 DRPC 不会返回任何响应,尽管最后一个 Bolt 已将元组发送到收集器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59861879/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com