gpt4 book ai didi

java - DataFlow 批处理作业速度慢/并行性不佳

转载 作者:行者123 更新时间:2023-12-02 02:18:41 25 4
gpt4 key购买 nike

我在 Google Cloud 中的 DataFlow 上运行的 Apache Beam 2.2.0 中有一个批处理作业,它与 Bigtable 交互。该作业似乎在非常小的测试数据集上正确执行,但似乎并行化不佳,而且确实没有充分利用投入的资源。

目标大致是实现以下目标:

  • 给定一个初始关键字,例如名称“Bob”,通过行前缀“Bob”查找 Bigtable TableA 中的所有行(完整行架构类似于“Bob*CategoryXXX”
  • 对于每一行,通过行前缀“CategoryXXX”查找 TableB 中的所有行(完整行架构类似于“CategoryXXX*ItemIDYYY”
  • 对于这些行中的每一行,通过行前缀“ItemIDYYY”查找 TableC 中的所有行
  • 计算上一次操作中的所有项目
  • 将这些结果写入 BigTable 中的另一个表

我用 10 个 n1-standard-1 工作人员运行这项工作,明确禁止自动缩放,因为我试图缩小规模。每个工作线程的 CPU 利用率低于 10%,BigTable 实例似乎同样未得到充分利用(几乎为零 Activity )。我的自定义计数器显示了一点点进度,因此工作并没有卡住,但工作速度非常慢。

以下是相关代码片段:

// Use side inputs to provide the relevant Table ID at each step            
final PCollectionView<String> tableA =
p.apply(Create.of("TableA")).apply(View.<String>asSingleton());
final PCollectionView<String> tableB =
p.apply(Create.of("TableB")).apply(View.<String>asSingleton());
final PCollectionView<String> tableC =
p.apply(Create.of("TableC")).apply(View.<String>asSingleton());

p.apply(Create.of(inputID.getBytes())) // <- Initial keyword "Bob"
.apply(ParDo.of(new PartialMatch(configBT, tableA))
.withSideInputs(tableA))
.apply(ParDo.of(new PartialMatch(configBT, tableB))
.withSideInputs(tableB))
.apply(ParDo.of(new PartialMatch(configBT, tableC))
.withSideInputs(tableC))
.apply(ParDo.of(new GetInfo(configBT)))
.apply(Sum.<String>integersPerKey())
.apply(ParDo.of(new LogInfo(configBT)));
p.run().waitUntilFinish();


class PartialMatch extends AbstractCloudBigtableTableDoFn<byte[], byte[]>
{
private static final long serialVersionUID = 1L;
final PCollectionView<String> m_tableName;
private Counter m_ct = Metrics.counter(PartialMatch.class, "matched");

public PartialMatch(CloudBigtableConfiguration config,PCollectionView<String> side1)
{
super(config);
m_tableName = side1;
}

@ProcessElement
public void processElement(DoFn<byte[], byte[]>.ProcessContext c)
{
try
{
byte rowKey[] = c.element();
Scan s = new Scan(rowKey);
FilterList fList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
fList.addFilter(new PrefixFilter(rowKey));
fList.addFilter(new KeyOnlyFilter());
s.setFilter(fList);

ResultScanner scanner = getConnection().getTable(TableName.valueOf(c.sideInput(m_tableName))).getScanner(s);

for (Result row : scanner)
{
String rowKeyEls[] = new String(row.getRow()).split(DEF_SPLIT);
c.output(rowKeyEls[1].getBytes());
}
scanner.close();
m_ct.inc();
} catch (IOException e){e.printStackTrace();}
}
}

class GetInfo extends AbstractCloudBigtableTableDoFn<byte[], KV<String, Integer>>
{
private static final long serialVersionUID = 1L;
private Counter m_ct = Metrics.counter(GetInfo.class, "extracted");
private Counter m_ctFail = Metrics.counter(GetInfo.class, "failed");

public GetInfo(CloudBigtableConfiguration config)
{
super(config);
}

@ProcessElement
public void processElement(DoFn<byte[], KV<String, Integer>>.ProcessContext c)
{
try
{
byte rowKey[] = c.element();

Result trnRow = getConnection().getTable(TableName.valueOf(DEF_TBL_ID)).get(new Get(rowKey));
if(trnRow.isEmpty())
m_ctFail.inc();
else
{
String b = new String(trnRow.getColumnLatestCell(DEF_CF, DEF_CN_B).getValueArray());
String s = new String(trnRow.getColumnLatestCell(DEF_CF,DEF_CN_S).getValueArray());
c.output(KV.of(b + DEF_FUSE + s, 1));
m_ct.inc();
}
} catch (IOException e){e.printStackTrace();}
}
}

class LogInfo extends AbstractCloudBigtableTableDoFn<KV<String, Integer>, Integer>
{
private static final long serialVersionUID = 1L;
private Counter m_ct = Metrics.counter(LogInfo.class, "logged");

public LogInfo(CloudBigtableConfiguration config)
{
super(config);
}

@ProcessElement
public void processElement(DoFn<KV<String, Integer>, Integer>.ProcessContext c)
{
try
{
Table tGraph = getConnection().getTable(TableName.valueOf(DEF.ID_TBL_GRAPH));

String name = c.element().getKey();
Integer ct = c.element().getValue();
tGraph.put(new Put(name.getBytes()).addColumn(DEF.ID_CF_INF, DEF.ID_CN_CNT, Bytes.toBytes(ct)));
m_ct.inc();
}catch (IOException e){e.printStackTrace();}
c.output(0);
}
}

什么可能会减慢速度?

最佳答案

有几件事。

  • 您正在为每个函数处理的每个元素创建一个与 Bigtable 的单独连接。相反,将连接的创建放入 @Setup 中,并将其关闭到 @Teardown
  • 当前代码根本没有关闭连接,因此连接正在泄漏,这也可能会减慢速度。
  • 您的管道是 ParDo 的一条直线,因此它们很可能全部融合在一起,并且遭受过度融合。查看最近的答案 How can I maximize throughput for an embarrassingly-parallel task in Python on Google Cloud Platform? 。在 Java 中,您可以在 ParDo 之间插入 Reshuffle.viaRandomKey()
  • 您的管道正在使用手工编写的代码将突变一一写入 BigTable。这是低效的,突变应该分批进行以最大化吞吐量。 BigtableIO.write() 会为您完成此操作,因此我建议您使用它而不是手工编写的代码。

关于java - DataFlow 批处理作业速度慢/并行性不佳,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48875452/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com