gpt4 book ai didi

hadoop - 具有 Hbase 表输入格式的 Flink DataSet api - 多次读取行

转载 作者:可可西里 更新时间:2023-11-01 16:38:05 27 4
gpt4 key购买 nike

我正在使用 Flink 1.3.2 和 hbase TableInputFormat来自 flink-connectors (flink-hbase_2.11),使用 DataSet API。

我有一个 HBase 表,其中行键的结构如下:

| RowKey       | data |
| 0-someuniqid | data |
| 0-someuniqid | data |
| 2-someuniqid | data |
| 2-someuniqid | data |
| 4-someuniqid | data |
| 5-someuniqid | data |
| 5-someuniqid | data |
| 7-someuniqid | data |
| 8-someuniqid | data |

表的前缀可以是 0 到 9(这是为了防止 hbase 节点中的热点)。在我的测试表中,没有人写入此表。

我有一份表格的工作:

tableInputFormat0 = new TableInputFormat("table", 0);
tableInputFormat1 = new TableInputFormat("table", 1);
...
tableInputFormat9 = new TableInputFormat("table", 9);


tableInputFormat0.union(tableInputFormat1).(...).union(tableInputFormat9)
.map(mapFunction())
.rebalance()
.filter(someFilter())
.groupBy(someField())
.reduce(someSumFunction())
.output(new HbaseOutputFormat());

问题是当读取大量记录时(大约 2000 万条记录),作业并不总是读取相同数量的记录。

大多数时候(正确地)读取:20,277,161 行。但有时它会显示:20,277,221 或 20,277,171 总是更多,永远不会更少。 (我通过 flink web 仪表板获得了这个数字,但是我可以在写入的内容中看到效果,即 reduce 聚合了太多数据)

我无法通过使用较小的数据集来缩小问题,因为在针对包含 500 万条记录的表运行作业时不会发生问题。由于体积的原因,很难确定哪些记录被多次读取。

我该如何调试(和解决)这个问题?

最佳答案

TableInputFormat 是一个抽象类,您必须实现一个子类。

我会做两件事:

  • 检查每个输入拆分只处理一次(此信息写入 JobManager 日志文件)
  • 调整您的输入格式以计算每个输入拆分发出的记录数。记录数和split id应该写入(TaskManager)日志。

这应该有助于确定问题是否出在

  • 由于一个(或多个)拆分被分配了不止一次或
  • 由于处理拆分的代码中存在错误。

关于hadoop - 具有 Hbase 表输入格式的 Flink DataSet api - 多次读取行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47699438/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com