gpt4 book ai didi

apache-flink - Flink 中的预洗牌聚合

转载 作者:行者123 更新时间:2023-12-05 04:46:24 27 4
gpt4 key购买 nike

我们正在将 spark 作业迁移到 flink。我们在 spark 中使用了 pre-shuffle 聚合。有没有办法在 spark.xml 中执行类似的操作?我们正在使用来自 apache kafka 的数据。我们正在使用键控滚动窗口来聚合数据。我们希望在执行 shuffle 之前聚合 flink 中的数据。

https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

最佳答案

是的,这是可能的,我将描述三种方式。首先是已经内置的 Flink Table API。第二种方式你必须构建自己的预聚合运算符。第三个是动态预聚合运算符,它在洗牌阶段之前调整要预聚合的事件数。

Flink 表 API

作为it is shown here您可以进行MiniBatch 聚合本地-全局聚合。第二种选择更好。您基本上告诉 Flink 创建每 5000 个事件的小批量,并在洗牌阶段之前预先聚合它们。

// instantiate table environment
TableEnvironment tEnv = ...

// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");

Flink 流 API

这种方式比较麻烦,因为您必须使用 OneInputStreamOperator 创建自己的运算符并使用 doTransform() 调用它.这是 BundleOperator 的示例。

public abstract class AbstractMapStreamBundleOperator<K, V, IN, OUT>
extends AbstractUdfStreamOperator<OUT, MapBundleFunction<K, V, IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, BundleTriggerCallback {
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
// get the key and value for the map bundle
final IN input = element.getValue();
final K bundleKey = getKey(input);
final V bundleValue = this.bundle.get(bundleKey);

// get a new value after adding this element to bundle
final V newBundleValue = userFunction.addInput(bundleValue, input);

// update to map bundle
bundle.put(bundleKey, newBundleValue);

numOfElements++;
bundleTrigger.onElement(input);
}

@Override
public void finishBundle() throws Exception {
if (!bundle.isEmpty()) {
numOfElements = 0;
userFunction.finishBundle(bundle, collector);
bundle.clear();
}
bundleTrigger.reset();
}
}

回调接口(interface)定义何时触发预聚合。每次流在 if (count >= maxCount) 处达到捆绑限制时,您的预聚合运算符都会将事件发送到混洗阶段。

public class CountBundleTrigger<T> implements BundleTrigger<T> {
private final long maxCount;
private transient BundleTriggerCallback callback;
private transient long count = 0;

public CountBundleTrigger(long maxCount) {
Preconditions.checkArgument(maxCount > 0, "maxCount must be greater than 0");
this.maxCount = maxCount;
}

@Override
public void registerCallback(BundleTriggerCallback callback) {
this.callback = Preconditions.checkNotNull(callback, "callback is null");
}

@Override
public void onElement(T element) throws Exception {
count++;
if (count >= maxCount) {
callback.finishBundle();
reset();
}
}

@Override
public void reset() {
count = 0;
}
}

然后您使用 doTransform 调用您的运算符(operator):

myStream.map(....)
.doTransform(metricCombiner, info, new RichMapStreamBundleOperator<>(myMapBundleFunction, bundleTrigger, keyBundleSelector))
.map(...)
.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))

动态预聚合

如果您希望使用动态预聚合运算符,请检查 AdCom - Adaptive Combiner for stream aggregation .它基本上根据背压信号调整预聚合。它导致使用最大可能的洗牌阶段。

关于apache-flink - Flink 中的预洗牌聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68811184/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com