gpt4 book ai didi

apache-beam - 每秒调用最大请求数的管道设计

转载 作者:行者123 更新时间:2023-12-05 05:29:36 33 4
gpt4 key购买 nike

我的目标是创建一个每秒调用后端(云托管)服务最多次数的管道......我该如何实现?

背景故事:想象一下后端服务使用单个输入调用并返回单个输出。该服务具有与其关联的配额,允许每秒最大请求数(假设每秒 10 个请求)。现在想象一个无限源 PCollection,我希望通过我的后端服务传递输入中的元素来转换它们。我可以设想一个 ParDo 为输入 PCollection 中的每个元素调用一次后端服务。但是,这不会对后端执行任何类型的流量控制。

我可以想象我的 DoFn 逻辑测试来自后端响应的响应并重试直到成功,但这感觉不对。如果我有 100 个 worker ,那么我似乎会消耗大量资源并给后端增加负载。我想我想做的是限制从管道到后端的调用。

最佳答案

美好的一天,kolban .除了Bruno Volpato很有帮助 RampupThrottlingFn例如,我见过以下的组合。请毫不犹豫地告诉我如何更清楚地更新示例。

  1. PeriodicImpulse - 发出 Instant在固定的指定时间间隔。
  2. 如果使用 Dataflow Pipeline Options,则使用 maxNumWorkersnumWorkers 固定 worker 数量(请参阅 Dataflow runner) .
  3. Beam Metrics API随着时间的推移监视实际资源请求计数并设置警报。使用 Dataflow 时,Beam Metrics API 会自动连接到 Cloud Monitoring as Custom metrics

以下显示了从整个流水线开始的缩写代码,后面是一些需要提供清晰度的细节。它假设目标是 10 个工作人员,使用带有参数 --maxNumWorkers=10--numWorkers=10 的数据流,目标是限制所有工作人员之间的资源请求为 每秒 10 个请求。这相当于每个工作人员每秒 1 个请求

PeriodicImpulse 将请求创建限制为每秒 1 个

public class MyPipeline {
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create(/* Usually with options */);
PCollection<Response> responses = pipeline.apply(
"PeriodicImpulse",
PeriodicImpulse
.create()
.withInterval(Duration.standardSeconds(1L))
).apply(
"Build Requests",
ParDo.of(new RequestFn())
)
.apply(ResourceTransform.create());
}
}

RequestFn DoFn 发出从 PeriodicImpulse 发出的每个瞬间的请求

class RequestFn extends DoFn<Instant, Request> {
@ProcessElement
public void process(@Element Instant instant, OutputReceiver<Request> receiver) {
receiver.output(
Request.builder().build()
);
}
}

ResourceTransform 将请求转换为响应,增加计数器

class ResourceTransform extends PTransform<PCollection<Request>, PCollection<Response>> {

static ResourceTransform create() {
return new ResourceTransform();
}

public PCollection<Response> expand(PCollection<Request> input) {
return ParDo.of("Consume Resource", new ResourceFn());
}
}

class ResourceFn extends DoFn<Request, Response> {

private Counter counter = Metrics.counter(ResourceFn.class, "some:resource");

private transient ResourceClient client = null;

@Setup
public void setup() {
client = new ResourceClient();
}

@ProcessElement
public void process(@Element Request request, OutputReceiver<> receiver)
{
counter.inc(); // Increment the counter.
// not showing error handling
Response response = client.execute(request);
receiver.output(response);
}
}

请求和响应类

(旁白:考虑将 creating a Schema 用于请求输入和响应输出类。下面的示例使用 AutoValueAutoValueSchema )

@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract class Request {

/* abstract Getters. */

abstract String getId();

@AutoValue.Builder
static abstract class Builder {

/* abstract Setters. */

abstract Builder setId(String value);

abstract Request build();
}
}

@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract class Response {

/* abstract Getters. */

abstract String getId();

@AutoValue.Builder
static abstract class Builder {

/* abstract Setters. */

abstract Builder setId(String value);

abstract Response build();
}
}


关于apache-beam - 每秒调用最大请求数的管道设计,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/74908903/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com