gpt4 book ai didi

java - 如何创建新的 DistributedFunction

转载 作者:行者123 更新时间:2023-12-02 11:41:54 25 4
gpt4 key购买 nike

这种新的编程范式对我来说非常新。我想用给定类中定义的 DistributedFunction 替换 .map() 中的匿名函数。但我不确定如何创建新函数。

我有以下管道:

p.drawFrom(KafkaSources.kafka(properties, topic, "topicX", "topicY"))
.map(e -> {
Gson gson = new Gson();

KafkaMessage kafkaMessage = gson.fromJson(e.getValue().toString(),
KafkaMessage.class);

byte[] encodedData = Base64.getDecoder().decode(kafkaMessage.getData());

try {
kafkaMessage.setData(new String(encodedData, "utf-8"));
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}

return kafkaMessage;
})
.map(m -> m.getData())
.drainTo(Sinks.logger());

根据一些 Jet 示例,我最终得到以下结果:

p.drawFrom(KafkaSources.kafka(properties, topic, "topicX", "topicY"))
.map(KafkaHelper::decodeKafkaMessage)
.map(m -> m.getData())
.drainTo(Sinks.logger());

KafkaHelper类:

public final class KafkaHelper implements Serializable {

private static final long serialVersionUID = -3556269069192202060L;

public static KafkaMessage decodeKafkaMessage(Map.Entry<Object,Object> entry) {

Gson gson = new Gson();

KafkaMessage kafkaMessage = gson.fromJson(entry.getValue().toString(), KafkaMessage.class);

byte[] encodedData = Base64.getDecoder().decode(kafkaMessage.getData());

try {
kafkaMessage.setData(new String(encodedData, "utf-8"));
} catch (UnsupportedEncodingException e) {
System.out.println(e.getMessage());
e.printStackTrace();
}

return kafkaMessage;
}

}

此方法是否遵循将 DistributedFunction 传递给 .map() 的规范/要求?如果是,为什么?如果不是,我应该做什么改变?

最佳答案

是的,在您的两个示例中,您都创建了一个 DistributedFunction 实例并将其传递给 map()。 Java 8 有一条规则,第一个示例中的 lambda 函数和第二个示例中的方法引用用于创建 DistributedFunction 的合成子类型,该子类型通过以下方式实现其单一抽象方法(“SAM”):您提供的代码。

您的 KafkaHelper 不必是可序列化的,因为您从不实例化它。您还可以将静态方法decodeKafkaMessage放在任何其他类中,因为它不依赖于类实例。

关于java - 如何创建新的 DistributedFunction,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48485250/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com