gpt4 book ai didi

google-cloud-platform - Dataflow 如何自动扩展和分配工作负载?

转载 作者:行者123 更新时间:2023-12-04 13:44:20 25 4
gpt4 key购买 nike

阅读后this question ,我对Dataflow/Apache Beam 如何分配工作负载仍有一些疑问。我遇到的问题可以用以下代码演示:

package debug;

import java.io.IOException;

import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

public class DebugPipeline {
@SuppressWarnings("serial")
public static PipelineResult main(String[] args) throws IOException {

/*******************************************
* SETUP - Build options.
********************************************/

DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setAutoscalingAlgorithm(
DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED);
// Autoscaling will scale between n/15 and n workers, so from 1-15 here
options.setMaxNumWorkers(15);
// Default of 250GB is absurdly high and we don't need that much on every worker
options.setDiskSizeGb(32);
// Manually configure scaling (i.e. 1 vs 5 for comparison)
options.setNumWorkers(5);

// Debug Pipeline
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(PubsubIO.readStrings()
.fromSubscription("your subscription"))
// this is the transform that I actually care about. In production code, this will
// send a REST request to some 3rd party endpoint.
.apply("sleep", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) throws InterruptedException {
Thread.sleep(500);
c.output(c.element());
}
}));

return pipeline.run();
}
}

比较使用 1 个 worker 和 5 个 worker 时的最大吞吐量,而不是后者的效率提高了 5 倍,只是稍微提高了一点点。这让我想知道以下问题:
  • 假设每个工作人员使用 4 个 vCPU,每个线程是否绑定(bind)到特定的 DoFn,或者如果需要提高性能,可以在给定时刻在所有线程上调用相同的 DoFn?
  • 假设有不止一个 worker ,每个 worker 都会得到一个完整的管道,即每个 Transform 至少有一个实例,包括源?
  • Dataflow/Apache Beam 如何确定更频繁地调用哪个转换?是否会创建更多占用 CPU 资源的 DoFn 实例?上墙时间更长?或者每个 Transform 都被复制了相同的时间?
  • 根据Apache programming guide ,即后端相当于 asynchronous “job” .这是否意味着每个 DoFn 实例都是异步处理的?
  • 同样,在提供的示例代码中,如何异步处理“ sleep ”转换?
  • 在生产代码中,Thread.sleep替换为对第 3 方 API 的同步 http 请求。异步过程是否意味着它将同步客户端转换为异步?

  • 更新

    还有一个额外的问题:
    Dataflow documentation有一个关于 PubSubIO 的评论:

    In extreme cases (i.e. Cloud Pub/Sub subscriptions with large publishing batches or sinks with very high latency), autoscaling is known to become coarse-grained.



    你能不能扩展一下:
  • 什么 大批量出版意思?即大批量或大批量?
  • 是否高延迟接收器 在接收器之前的转换中包括高延迟?
  • 什么是粗粒行为?
  • 最佳答案

  • DoFns 可以在给定时刻在所有线程上调用,(look in the blue star)
  • 是的,每个 worker 将处理一个完整的管道
  • Cloud Dataflow 服务执行各种 optimizations : 融合和组合
  • 那些可以,没有数据依赖的步骤
  • “ sleep ”可以在不同的 worker 上同时处理并按顺序发送到队列中。
  • 取决于数据依赖
  • 关于google-cloud-platform - Dataflow 如何自动扩展和分配工作负载?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51657291/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com