gpt4 book ai didi

java - 无法从与 StateStore 不同的应用程序访问 KTable

转载 作者:塔克拉玛干 更新时间:2023-11-02 08:02:53 26 4
gpt4 key购买 nike

我有两个 Java 应用程序(App1、App2)来测试如何访问 KTable来自 docker 中单实例环境中的不同应用程序。

第一个应用程序 (App1) 写入 KTable使用以下代码。

public static void main(String[] args)
{
final Properties props = new Properties();

props.put(StreamsConfig.APPLICATION_ID_CONFIG,"gateway-service");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.0.11:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, ServiceTransactionSerde.class);


KStreamBuilder builder = new KStreamBuilder();


KStream<String,ServiceTransaction> source = builder.stream("gateway_request_processed");


KStream<String, Long> countByApi = source.groupBy((key,value)-> value.getApiId().toString()).count("Counts").toStream();

countByApi.to(Serdes.String(), Serdes.Long(),"countByApi");

countByApi.print();

final KafkaStreams streams = new KafkaStreams(builder,props);

streams.start();
System.out.println(streams.state());

System.out.println(streams.allMetadata());
System.out.println(streams.allMetadataForStore("countByApi"));

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {


@Override
public void run() {
System.out.println(streams.allMetadata());
streams.close();
}
}));
}

当我运行我的生产者时,我得到了 App1 中代码的以下输出

    RUNNING
[]
[]
[KTABLE-TOSTREAM-0000000006]: c00af5ee-3c2d-4d12-9c4b-3b55c1284dd6, 19

这显示状态 = RUNNING。商店的元数据也是空的。但是请求得到处理并成功存储在 KTable 中 (String,Long)。

当我运行 kafka-topics.sh --list --zookeeper:2181我得到以下主题。

bash-4.3# kafka-topics.sh --list --zookeeper zookeeper:2181
__consumer_offsets
countByApi
gateway-Counts-changelog
gateway-Counts-repartition
gateway-service-Counts-changelog
gateway-service-Counts-repartition
gateway_request_processed

这表明 KTable 以某种方式坚持新主题。

然后我有一个带有以下代码的第二个命令行应用程序 (App2),它试图访问此 KTable作为状态存储 (ReadOnlyKeyValueStore) 并访问它。

 public static void main( String[] args )
{
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "gateway-service-table-client");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.0.11:9092");


KStreamBuilder builder = new KStreamBuilder();
KafkaStreams streams = new KafkaStreams(builder,props);

streams.cleanUp();
streams.start();
System.out.println( "Hello World!" );
System.out.println(streams.state());

ReadOnlyKeyValueStore<String,Long> keyValueStore =
streams.store("countByApi", QueryableStoreTypes.keyValueStore());

final KeyValueIterator<String,Long> range = keyValueStore.all();

while(range.hasNext()){
KeyValue<String,Long> next = range.next();
System.out.println(String.format("key: %s | value: %s", next.key,next.value));

}

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {


@Override
public void run() {
System.out.println(streams.allMetadata());
streams.close();
}
}));

}

当我运行 2.App 时,我确实收到了错误消息:

    RUNNING
Exception in thread "main" org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, countByApi, may have migrated to another instance.
at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60)
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:728)
at com.comp.streamtable.App.main(App.java:37)

不幸的是,我只有 1 个实例,我验证状态是否等于“RUNNING”。

注意:我必须为每个应用程序选择不同的 application.id,因为这是另一个异常。只是想指出这一点,因为这可能是出于兴趣。

从另一个应用程序访问我的 KTable 在这里我错过了什么?

最佳答案

您正在为两个应用程序使用两个不同的 application.id。因此,这两个应用程序完全分离。

交互式查询是为同一应用的不同实例设计的,不能跨应用工作。

这篇博文可能会有所帮助:https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/

关于java - 无法从与 StateStore 不同的应用程序访问 KTable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45963847/

26 4 0
文章推荐: java - 如何创建一个流来模拟初始化为 1 + 外循环索引的嵌套循环的行为
文章推荐: ios - 用于多部分表单上传的 AFNetworking 2.0 API
文章推荐: iphone - iOS 7 是否仍然需要没有 alpha、没有圆角和没有文件扩展名的 iTunesArtwork 图像?
文章推荐: java - Stream 到 InputStream