gpt4 book ai didi

stream - 如何在 Flink 中使用 ListState 进行 BroadcastProcessFunction

转载 作者:行者123 更新时间:2023-12-02 03:07:35 25 4
gpt4 key购买 nike

我们有一个包含交易的非键控数据流和一个包含规则的广播流。事实上,我们希望根据最后看到的规则来处理交易。如果我们最后看到的规则是每日,我们必须将当前交易添加到每日TrnsList。此外,如果 dailyTrnsList 大小大于阈值,我们必须清除列表并将事务写入数据库。如果最后看到的规则是 temp,我们会执行相同的操作。

代码如下:

public class TransactionProcess extends BroadcastProcessFunction<String, String, String>{
private List<String> dailyTrnsList = new ArrayList<>();
private List<String> tempTrnsList = new ArrayList<>();

private final static int threshold = 100;

private final MapStateDescriptor<String, String> ruleStateDesc =
new MapStateDescriptor<>(
"ControlMapState",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);

@Override
public void processElement(String s,
ReadOnlyContext readOnlyContext,
Collector<Transaction> collector) throws Exception
{
String ruleName = readOnlyContext.getBroadcastState(ruleStateDesc).get("rule");

if(ruleName.equals("daily"))
{
dailyTrnsList.add(s);
if(dailyTrnsList.size()>=threshold)
{
List<String> buffer = dailyTrnsList;
dailyTrnsList = new ArrayList<>();
insert_to_db(buffer,"daily");
}
}
else if(ruleName.equals("temp"))
{
tempTrnsList.add(s);
if(tempTrnsList.size()>=threshold)
{
List<String> buffer = tempTrnsList;
tempTrnsList = new ArrayList<>();
insert_to_db(buffer,"temp");
}
}

collector.collect(s);

}
@Override
public void processBroadcastElement(String s,
Context context,
Collector<CardTransaction> collector) throws Exception
{
if (s.equals("temp"))
{
context.getBroadcastState(ruleStateDesc).put("rule", "temp");
List<String> buffer = dailyTrnsList;
dailyTrnsList = new ArrayList<>();
insert_to_db(buffer,"daily");
}
else if (s.equals("daily"))
{
context.getBroadcastState(ruleStateDesc).put("rule", "daily");
List<String> buffer = tempTrnsList;
tempTrnsList = new ArrayList<>();
insert_to_db(buffer,"temp");
}
}
}

我们的问题是编写容错方法。我们不知道如何使用 ListState 来解决我们的问题。到目前为止,我们找到的唯一解决方案是实现 Working with State 下的 CheckpointedFunction 接口(interface)。 Flink 文档中的部分。

private ListState<String> dailyTrns;
private ListState<String> tempTrns;

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
dailyTrns.clear();
tempTrns.clear();
for (String[] element : dailyTrnsList)
dailyTrns.add(element);
for (String[] element : tempTrnsList)
tempTrns.add(element);
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {

dailyTrns = context.getOperatorStateStore().getListState(dailyDescriptor);
tempTrns = context.getOperatorStateStore().getListState(tempDescriptor);
if (context.isRestored()) {
for (String[] element : dailyTrns.get())
dailyTrnsList.add(element);
for (String[] element : tempTrns.get())
tempTrnsList.add(element);
}
}

请您指导我们,如果这种方法不是正确的解决方案,我们还能做什么?如果解决方案正确,则未从 dailyTrnsListtempTrnsList 传输到 dailyTrnstempTrns 的元素会发生什么情况>?

如有任何帮助,我们将不胜感激。

提前谢谢您。

最佳答案

您可以简化您的实现,这样就不必担心这个问题。您可以执行以下操作:

(1) 简化BroadcastProcessFunction,使其所做的只是将传入流分成两个流:日常事务流和临时事务流。它通过根据最新规则选择两个侧面输出之一来实现此目的。

(2) 按照 BroadcastProcessFunction 的计数窗口创建批处理并将其写入数据库。

或者,BroadcastProcessFunction 可以写出(规则、事务)的元组,而不是使用侧面输出,然后您可以通过规则对流进行键入。不管怎样,我们的想法是让窗口 API 为您管理容错列表。

关于stream - 如何在 Flink 中使用 ListState 进行 BroadcastProcessFunction,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59343157/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com