gpt4 book ai didi

java - 使用自定义键创建 KTable 时出错

转载 作者:行者123 更新时间:2023-12-02 08:39:18 28 4
gpt4 key购买 nike

用例 - 有一个包含消息的主题(空,元数据)。我需要从主题创建一个 Ktable,其中键 (metadata.entity_id) 和值作为元数据。该表稍后将用于与具有相同键的流进行连接。

    private final static String KAFKA_BROKERS = "localhost:9092";
private final static String APPLICATION_ID = "TestMetadataTable";
private final static String AUTO_OFFSET_RESET_CONFIG = "earliest";
private final static String METADATA_TOPIC = "test-metadata-topic";


public static void main (String args[]) {

//Setting the Stream configuration params.
final Properties kafkaStreamConfiguration = new Properties();
kafkaStreamConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
kafkaStreamConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, APPLICATION_ID);
kafkaStreamConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);

kafkaStreamConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);

//Creating Serdes for MetricMetadata
GenericJsonSerializer<MetricMetadata> metadataJsonSerializer = new GenericJsonSerializer<MetricMetadata>();
GenericJsonDeserializer<MetricMetadata> metadataJsonDeserializer = new GenericJsonDeserializer<MetricMetadata>(MetricMetadata.class);
Serde<MetricMetadata> metadataSerde = Serdes.serdeFrom(metadataJsonSerializer, metadataJsonDeserializer);


//Creating kafka stream.
final StreamsBuilder builder = new StreamsBuilder();

KTable<String, MetricMetadata> metaTable = builder.table(METADATA_TOPIC, Consumed.with(Serdes.String(), metadataSerde))
.groupBy((key, value) -> KeyValue.pair(value.getEntity_id(), value))
.aggregate( () -> null,
(key, value, aggValue) -> value,
(key, value, aggValue) -> value
);

final KafkaStreams streams = new KafkaStreams(builder.build(), kafkaStreamConfiguration);
streams.start();
}

一旦我将消息推送到主题 - METADATA_TOPIC。这会导致以下错误。我在这里错过了什么吗?卡夫卡流2.2.0

Exception in thread "TestMetadataTable-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store test-metadata-topic-STATE-STORE-0000000000
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:519)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:471)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:459)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.streams.kstream.internals.ChangedSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: org.apache.kafka.streams.kstream.internals.Change). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
at org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$KTableMapProcessor.process(KTableRepartitionMap.java:95)
at org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$KTableMapProcessor.process(KTableRepartitionMap.java:72)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:102)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:79)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:127)
at org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:72)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:224)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:239)
... 10 more
Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:161)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:102)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 28 more

最佳答案

在这种情况下,您需要向KTable.groupBy()提供Serdes通过 Grouped 进行操作,调用 groupBy 会触发重新分区。您还需要为状态存储的聚合操作提供相同的 Serdes

此外,由于 key 为 null,我认为您最初应该使用 KStream。然后调用groupByKey(您仍然需要通过Grouped提供Serdes),聚合将为您提供KTable想要。

在我的脑海里,这样的东西应该有效

builder.stream((METADATA_TOPIC, Consumed.with(Serdes.String(), metadataSerde))
.selectKey((key, value) -> KeyValue.pair(value.getEntity_id(), value))
.groupByKey(Grouped.with(Serdes.String(), metadataSerde))
.aggregate( () -> null,
(key, value, aggValue) -> value,
(key, value, aggValue) -> value,
Materialized.with(Serdes.String(), metadataSerde)
);

关于java - 使用自定义键创建 KTable 时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61485951/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com