gpt4 book ai didi

java - 似乎无法将 KStream 转换为 KTable

转载 作者:行者123 更新时间:2023-11-30 07:45:39 25 4
gpt4 key购买 nike

这是我第一次尝试使用KTable。我有一个 Kafka Stream,其中包含 A,B 类型的 Avro 序列化对象。这很好用。我可以编写一个消费得很好的 Consumer 或一个简单的 KStream 来简单地计算记录。

B 对象有一个包含国家代码的字段。我想将该代码提供给 KTable,以便它可以计算包含特定国家/地区代码的记录数。为此,我试图将流转换为 X,Y 流(或者实际上:国家代码,计数)。最后,我查看了表格的内容并提取了一个 KV 对数组。

我的代码(包含)总是会出现以下错误(请参阅“由...引起”的行):

2018-07-26 13:42:48.688 [com.findology.tools.controller.TestEventGeneratorController-16d7cd06-4742-402e-a679-898b9ef78c41-StreamThread-1; AssignedStreamsTasks] ERROR -- stream-thread [com.findology.tools.controller.TestEventGeneratorController-16d7c\
d06-4742-402e-a679-898b9ef78c41-StreamThread-1] Failed to process stream task 0_0 due to the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=com.findology.model.traffic.CpaTrackingCallback, partition=0, offset=962649
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:240)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:922)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.Integer / value type: java.lang.Integer). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:92)
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
at org.apache.kafka.streams.kstream.internals.KStreamTransform$KStreamTransformProcessor.process(KStreamTransform.java:59)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:224)
... 6 more
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to [B
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:146)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:94)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
... 19 more

这是我正在使用的代码。为简洁起见,我省略了某些类。 注意使用 Confluent KafkaAvro 类。

    private synchronized void createStreamProcessor2() {
if (streams == null) {
try {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, getClass().getName());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

StreamsConfig config = new StreamsConfig(props);
StreamsBuilder builder = new StreamsBuilder();

Map<String, Object> serdeProps = new HashMap<>();
serdeProps.put("schema.registry.url", schemaRegistryURL);
AvroSerde<CpaTrackingCallback> cpaTrackingCallbackAvroSerde = new AvroSerde<>(schemaRegistryURL);
cpaTrackingCallbackAvroSerde.configure(serdeProps, false);

// This is the key to telling kafka the specific Serde instance to use
// to deserialize the Avro encoded value
KStream<Long, CpaTrackingCallback> stream = builder.stream(CpaTrackingCallback.class.getName(),
Consumed.with(Serdes.Long(), cpaTrackingCallbackAvroSerde));

// provide a way to convert CpsTrackicking... info into just country codes
// (Long, CpaTrackingCallback) -> (countryCode:Integer, placeHolder:Long)
TransformerSupplier<Long, CpaTrackingCallback, KeyValue<Integer, Long>> transformer = new TransformerSupplier<Long, CpaTrackingCallback, KeyValue<Integer, Long>>() {
@Override
public Transformer<Long, CpaTrackingCallback, KeyValue<Integer, Long>> get() {
return new Transformer<Long, CpaTrackingCallback, KeyValue<Integer, Long>>() {

@Override
public void init(ProcessorContext context) {
// Not doing Punctuate so no need to store context
}

@Override
public KeyValue<Integer, Long> transform(Long key, CpaTrackingCallback value) {
return new KeyValue(value.getCountryCode(), 1);
}

@Override
public KeyValue<Integer, Long> punctuate(long timestamp) {
return null;
}

@Override
public void close() {
}
};
}
};

KTable<Integer, Long> countryCounts = stream.transform(transformer).groupByKey() //
.count(Materialized.as("country-counts"));

streams = new KafkaStreams(builder.build(), config);
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.cleanUp();
streams.start();

try {
countryCountsView = waitUntilStoreIsQueryable("country-counts", QueryableStoreTypes.keyValueStore(),
streams);
}
catch (InterruptedException e) {
log.warn("Interrupted while waiting for query store to become available", e);
}
}
catch (Exception e) {
log.error(e);
}
}
}

最佳答案

groupByKey() KStream 上的方法使用默认的序列化器/反序列化器(您尚未设置)。使用方法 groupByKey(Serialized<K,V> serialized) ,如:

.groupByKey(Serialized.with(Serdes.Integer(), Serdes.Long()))

另请注意,您在自定义 TransformerSupplier 中执行的操作只需使用 KStream.map 即可完成打电话。

关于java - 似乎无法将 KStream<A,B> 转换为 KTable<X,Y>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51548115/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com