gpt4 book ai didi

java - KafkaflapmapValues在传递json数组对象时会将记录拆分成多条记录吗?

转载 作者:行者123 更新时间:2023-12-02 01:43:08 26 4
gpt4 key购买 nike

我使用的是 confluence 5.0.0 版本*

我有一个如下所示的 JSON 数组:

{ 
"name" : "David,Corral,Babu",
"age" : 23
}

通过使用kafka流,我想根据“name”键值中的逗号标准将上述记录分成两部分。输出应该类似于:

{ 
"name" : "David",
"age" : 23
},
{
"name" : "Corral",
"age" : 23
},
{
"name" : "Babu",
"age" : 23
}

为此,我使用“flatMapValues”。但到目前为止我还无法实现 预期结果。

但想检查“flatmapValues”是否是要使用的正确函数 满足我的要求?

我使用了以下代码:

package test;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class RecordSplliter {

public static void main(String[] args) throws Exception {
System.out.println("** STARTING RecordSplliter STREAM APP **");
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "json-e44nric2315her");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, PersonSeder.class);

final Serde<String> stringSerde = Serdes.String();
final StreamsBuilder builder = new StreamsBuilder();

// Consume JSON and enriches it
KStream<String, Person> source = builder.stream("streams-plaintext-input");

KStream<String, String> output = source
.flatMapValues(person -> Arrays.asList(person.getName().split(",")));
output.to("streams-output");

final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);

// Attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});

try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}

在运行时我遇到以下异常:

    08:31:10,822 ERROR 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks -
stream-thread [json-enricher-0f8bc964-40c0-41f2-a724-dfa638923387-
StreamThread-1] Failed to process stream task 0_0 due to the following
error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in
process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=streams-
plaintext-input, partition=0, offset=0
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:304)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:957)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: myapps.PersonSerializer) is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:288)
... 6 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to myapps.Person
at myapps.PersonSerializer.serialize(PersonSerializer.java:1)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:154)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:98)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 18 more
08:31:10,827 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-0f8bc964-40c0-41f2-a724-dfa638923387-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
08:31:10,827 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-0f8bc964-40c0-41f2-a724-dfa638923387-StreamThread-1] Shutting down
08:31:10,833 INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=json-enricher-0f8bc964-40c0-41f2-a724-dfa638923387-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
08:31:10,843 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-0f8bc964-40c0-41f2-a724-dfa638923387-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
08:31:10,843 INFO org.apache.kafka.streams.KafkaStreams - stream-client [json-enricher-0f8bc964-40c0-41f2-a724-dfa638923387] State transition from RUNNING to ERROR
08:31:10,843 WARN org.apache.kafka.streams.KafkaStreams - stream-client [json-enricher-0f8bc964-40c0-41f2-a724-dfa638923387] All stream threads have died. The instance will be in error state and should be closed.
08:31:10,843 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-0f8bc964-40c0-41f2-a724-dfa638923387-StreamThread-1] Shutdown complete
Exception in thread "json-enricher-0f8bc964-40c0-41f2-a724-dfa638923387-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=streams-plaintext-input, partition=0, offset=0
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:304)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:957)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: myapps.PersonSerializer) is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:288)
... 6 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to myapps.Person
at myapps.PersonSerializer.serialize(PersonSerializer.java:1)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:154)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:98)
at
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 18 more

最佳答案

异常是因为您的 flatMapValues 生成了 String 类型的值。在您的代码中,您没有将任何 Produced 传递给 KStream::to 函数,因此它尝试使用默认函数(在属性中传递),在您的情况下为 PersonSeder.class

您的值的类型为String,但PersonSeder.class用于序列化。

如果你想分割它,你需要这样的东西

KStream<String, Person> output = source
.flatMapValues(person ->
Arrays.stream(person.getName().split(","))
.map(name -> new Person(name, person.getAge()))
.collect(Collectors.toList()));

我已将以下代码与您的序列化器和解串器一起使用,这是对称的(也使用 Gson)并且它可以工作

    Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, PersonSerdes.class);
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Person> source = builder.stream("input");
KStream<String, Person> output = source
.flatMapValues(person ->
Arrays.stream(person.getName()
.split(","))
.map(name -> new Person(name, person.getAge()))
.collect(Collectors.toList()));
output.to("output");

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

更新1:

根据您关于使用 json 而不是 POJO 的问题,一切都取决于您的 Sedes。如果您使用通用 Serdes,您可以对 Json( map )进行序列化和反序列化

下面是简单的 MapSerdes,可用于此目的以及使用示例代码。

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

import java.lang.reflect.Type;
import java.nio.charset.Charset;
import java.util.Map;

public class MapSerdes implements Serde<Map<String, String>> {

private static final Charset CHARSET = Charset.forName("UTF-8");

@Override
public void configure(Map<String, ?> configs, boolean isKey) {}

@Override
public void close() {}

@Override
public Serializer<Map<String, String>> serializer() {
return new Serializer<Map<String, String>>() {
private Gson gson = new Gson();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {}

@Override
public byte[] serialize(String topic, Map<String, String> data) {
String line = gson.toJson(data); // Return the bytes from the String 'line'
return line.getBytes(CHARSET);
}

@Override
public void close() {}
};
}

@Override
public Deserializer<Map<String, String>> deserializer() {
return new Deserializer<Map<String, String>>() {
private Type type = new TypeToken<Map<String, String>>(){}.getType();
private Gson gson = new Gson();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {}

@Override
public Map<String, String> deserialize(String topic, byte[] data) {
Map<String,String> result = gson.fromJson(new String(data), type);
return result;
}

@Override
public void close() {}
};
}
}

使用示例:相反,名称取决于您的 map ,您可以使用不同的属性。

public class GenericJsonSplitterApp {

public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MapSerdes.class);

final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Map<String, String>> source = builder.stream("input");
KStream<String, Map<String, String>> output = source
.flatMapValues(map ->
Arrays.stream(map.get("name")
.split(","))
.map(name -> {
HashMap<String, String> splittedMap = new HashMap<>(map);
splittedMap.put("name", name);
return splittedMap;
})
.collect(Collectors.toList()));
output.to("output");

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}

关于java - KafkaflapmapValues在传递json数组对象时会将记录拆分成多条记录吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54148261/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com