gpt4 book ai didi

java - 卡夫卡流: Invalid topology: StateStore is not added yet

转载 作者:行者123 更新时间:2023-11-30 01:58:04 27 4
gpt4 key购买 nike

我们的目标是实现以下架构。最重要的是能够读取主题 T1 的所有数据(来自所有分区)。

enter image description here

我们面临的问题是我们无法在从不同构建器创建的两个节点之间进行连接(每个实例中有两个不同的构建器)。在每个实例中,我们都创建了两个构建器(B1、B2)。 B1创建一个源处理器,从T1主题的所有分区读取数据,因此每个实例都有唯一的ID。 B2从T2的一个分区的一个分区读取数据。后来,当我们加入时,我们收到此错误拓扑无效:StateStore聚合流存储尚未添加,因为B1和B2具有不同的APP_ID。

这是我们的代码:

StrmApp 类

public class StrmApp extends StrmProc {
private StreamsBuilder myBuilder;
private Validator<String, Data> dataValidator;
private Properties ownBuilderProps;
private KafkaStreams ownStreams;

public StrmApp(ValidDataService dataService, ProcessConfig config, ProcessListener listener) {
super(dataService, config, listener);
myBuilder = new StreamsBuilder();
dataValidator = getValidDataService().getValidator(String.class, Data.class);
ownBuilderProps = new Properties();
ownBuilderProps.putAll(getProperties());
// Unique ID for each instance (different consumer group)
ownBuilderProps.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID());
}

private KTable<String, TheDataList> globalStream() {

// KStream of records from T1 topic using String and TheDataSerde deserializers
KStream<String, Data> trashStream = getOwnBuilder().stream("T1", Consumed.with(Serdes.String(), SerDes.TheDataSerde));

// Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
KGroupedStream<String, Data> kGroupedStream = trashStream.groupByKey();

// Describe how a StateStore should be materialized (as a KTable).
// In our case we are using the default RocksDB back-ends by providing "vdp-aggregated-stream-store" as a state store name
Materialized<String, TheDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("aggregated-stream-store");
materialized = materialized.withValueSerde(SerDes.TheDataListSerde);

// Return a KTable
return kGroupedStream.aggregate(() -> new TheDataList(), (key, value, aggregate) -> {
if (!value.getValideData())
aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
else
aggregate.getList().add(value);
return aggregate;
}, materialized);
}

private Data tombstone(String Vid) {
Data d = new Data();
d.setVid(Vid);
d.setValideData(false);
d.setTimestamp(System.currentTimeMillis());
return d;
}

@Override
public void run() {
/* read from topic 2 (T2) - we want to only read one partition */
KStream<String, Data> inStream = getBuilder()
.stream(getProcessConfig().getTopicConfig().getTopicIn(), Consumed.with(Serdes.String(), SerDes.TheDataSerde))
.filter(getValidDataService().getValidator(String.class, Data.class));

/* Read all partitions from topic 1 (T1) - we want to read from all partitions (P1, P2 and P3) */
KTable<String, TheDataList> ft = globalStream();

// ERROR: Invalid topology: StateStore vdp-aggregated-stream-store is not added yet.
// When it comes to do the join it raises this error
// I think because two builders have different APP_ID
logger.warn("##JOIN:");
/* join bteween data coming from T1 with data coming from T2 */
KStream<String, TheDataList> validated = inStream.join(ft,
new ValueJoiner<Data, TheDataList, TheDataList>() {
@Override
public TheDataList apply(Data valid, TheDataList ivalids) {
ivalids.getList().forEach((c) -> {
dataValidator.validate(c, valid);
});
return ivalids;
}
});

// ...... some code

ownStreams = StreamTools.startKStreams(getOwnBuilder(), getOwnBuilderProps(), this, this);
super.startStreams();
}

private Properties getOwnBuilderProps() {
return ownBuilderProps;
}

private StreamsBuilder getOwnBuilder() {
// return getBuilder();
return myBuilder;
}

// .......
}

StrmProc 类

public abstract class StrmProc extends AProcess {
private final StreamsBuilder builder;

public StrmProc(ValidDataService dataService, ProcessConfig config, ProcessListener listener) {
super(dataService, config, listener);
this.builder = new StreamsBuilder();
}

protected final StreamsBuilder getBuilder() {
return builder;
}

protected final KafkaStreams startStreams() {
return StreamTools.startKStreams(getBuilder(), getProperties(), this, this);

}

// ........

}

AProcess 类

public abstract class AProcess implements Process {
private final Properties propertie;
private final ProcessConfig config;
private final ValidDataService dataService;
private final ProcessListener listener;

protected AProcess(ValidDataService dataService, ProcessConfig config, ProcessListener listener) {
super();
this.dataService = dataService;
this.propertie = getProperties(config);
this.config = config;
this.listener = listener;
}

private Properties getProperties(ProcessConfig config) {
Properties kafkaProperties = new Properties();
kafkaProperties = new Properties();
kafkaProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, config.getApp());
kafkaProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServerUrl());
kafkaProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return kafkaProperties;
}

protected Properties getProperties() {
return propertie;
}

protected ProcessConfig getProcessConfig() {
return config;
}

protected ValidDataService getValidDataService() {
return dataService;
}

// .......

}

请问如何使用 Kafka 流实现此目的?

最佳答案

为了在 Kafka Streams 上使用 join,您需要使用单个 StreamsBuilder 实例,而不是两个(在您的情况下,其中两个 - 变量 inStreamft)。

如果 KeyValueStore 未添加到 StreamsBuilder 实例中,通常 Kafka Streams 会抛出异常 TopologyException: Invalid Topology: StateStore is not尚未添加: streamsBuilder.addStateStore(storeBuilder) .

关于java - 卡夫卡流: Invalid topology: StateStore is not added yet,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53722235/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com