gpt4 book ai didi

使用 Apache Kafka Streaming 解析 JSON 数据

转载 作者:行者123 更新时间:2023-12-02 00:55:32 25 4
gpt4 key购买 nike

我有一个从 Kafka 主题读取 JSON 数据的场景,通过使用 Kafka 0.11 版本,我需要编写 Java 代码来流式传输 Kafka 主题中存在的 JSON 数据。我的输入是包含数组的 Json 数据字典。

现在我的要求是获取“文本”字段,从 json 数据中键入数组中包含的字典,然后通过 Kafka Streaming 将所有这些文本推文传递到另一个主题。

我写的代码到这里为止。请帮我解析一下数据。

用于流式传输的 Java 代码

final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

KStreamBuilder builder = new KStreamBuilder();

KStream<String, JsonNode> personstwitter =builder.stream(Serdes.String(), jsonSerde, "Persons");//taking the json node as input


personstwitter.to(Serdes.String(), jsonSerde,"Persons-output");

最佳答案

我建议您执行以下操作,以便更好地控制 JSON 数据。

  1. 编写一个序列化器反序列化器
  2. 基于 JSON 字符串创建 POJO。 POJO 是更好地控制数据的最佳方式。
  3. 将数据映射到 POJO 以访问所需数据。

POJO:

@JsonRootName("person")
public class Person implements Serializable {

/**
*
*/
private static final long serialVersionUID = 1L;
private String name;
private String personalID;
private String country;
private String occupation;

public Person() {

}

@JsonCreator
public Person(@JsonProperty("name") String name,@JsonProperty("personalID") String personalID,
@JsonProperty("country") String country,@JsonProperty("occupation") String occupation){
this.name= name;
this.personalID = personalID;
this.country = country;
this.occupation = occupation;
}

//getters and setters stripped
}

序列化器:

public class JsonSerializer<T> implements Serializer<T> {

private ObjectMapper om = new ObjectMapper();

@Override
public void close() {
// TODO Auto-generated method stub

}

@Override
public void configure(Map<String, ?> config, boolean isKey) {
// TODO Auto-generated method stub

}

@Override
public byte[] serialize(String topic, T data) {
byte[] retval = null;
try {
System.out.println(data.getClass());
retval = om.writeValueAsString(data).getBytes();
} catch (JsonProcessingException e) {
throw new SerializationException();
}
return retval;
}

}

解串器:

public class JsonDeserializer<T> implements Deserializer<T> {

private ObjectMapper om = new ObjectMapper();
private Class<T> type;

/*
* Default constructor needed by kafka
*/
public JsonDeserializer() {

}

public JsonDeserializer(Class<T> type) {
this.type = type;
}

@Override
public void close() {
// TODO Auto-generated method stub

}

@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, ?> map, boolean arg1) {
if (type == null) {
type = (Class<T>) map.get("type");
}

}

@Override
public T deserialize(String undefined, byte[] bytes) {
T data = null;
if (bytes == null || bytes.length == 0) {
return null;
}

try {
System.out.println(getType());
data = om.readValue(bytes, type);
} catch (Exception e) {
throw new SerializationException(e);
}

return data;
}

protected Class<T> getType() {
return type;
}

}

消费者:

public class ConsumerUtilities {

public static Properties getProperties() {

Properties configs = new Properties();
configs.put(StreamsConfig.APPLICATION_ID_CONFIG,
"Kafka test application");
configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return configs;
}

public static KStreamBuilder getStreamingConsumer() {
KStreamBuilder builder = new KStreamBuilder();
return builder;
}

public static void getStreamData() {
JsonSerializer<Person> personJsonSerializer = new JsonSerializer<>();
JsonDeserializer<Person> personJsonDeserializer = new JsonDeserializer<>(
Person.class);
Serde<Person> personSerde = Serdes.serdeFrom(personJsonSerializer,
personJsonDeserializer);
KStreamBuilder builder = getStreamingConsumer();

try {

KStream<String, Person> kStream = builder.stream(Serdes.String(),
personSerde, "test");
kStream.foreach(new ForeachAction<String, Person>() {

@Override
public void apply(String arg0, Person arg1) {
System.out.println(arg1.getCountry());
}

});
} catch (Exception s) {
s.printStackTrace();
}
KafkaStreams kafkaStreams = new KafkaStreams(builder, getProperties());
kafkaStreams.start();
}

}

制作人:

public class ProducerUtilities {

public static org.apache.kafka.clients.producer.Producer<String, Person> getProducer() {
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.CLIENT_ID_CONFIG,
"kafka json producer");
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"com.kafka.api.serdes.JsonSerializer");

org.apache.kafka.clients.producer.Producer<String, Person> producer = new KafkaProducer<String, Person>(
configProperties);
return producer;
}

public ProducerRecord<String, Person> createRecord(Person person) {
ProducerRecord<String, Person> record = new ProducerRecord<String, Person>(
"test", person);
return record;
}

}

关于使用 Apache Kafka Streaming 解析 JSON 数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50524867/

25 4 0