gpt4 book ai didi

kotlin - 如何使用 spring-cloud-function 使用 Kafka-events,但有时会产生

转载 作者:行者123 更新时间:2023-12-02 12:44:47 25 4
gpt4 key购买 nike

我已经在 Kotlin 中使用 kafka-streams 成功设置了 spring-cloud-function 并拥有一个工作原型(prototype)。
我有一个函数可以接收来自一个主题的事件并为另一个主题生成另一个事件。
这是我的application.yml

spring:
cloud:
stream:
bindings:
consumeAndProduce-in-0:
destination: topicToConsumeFrom
consumeAndProduce-out-0:
destination: topicToProduceTo
kafka:
binder:
brokers: ${kafka.broker.prod}
default:
group: ${spring.application.name}
function:
definition: consumeAndProduce
我的 kotlin 应用程序如下所示:
@SpringBootApplication
@ConfigurationPropertiesScan
class Application {
companion object {
@JvmStatic
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
}

@Bean
fun consumeAndProduce(): (ConsumerEvent) -> ProducerEvent {
return { message ->
doSomethingAndReturnProducerEvent(message)
}
}
}
我在互联网上关注了许多例子,它就像一个魅力。只要将消息放入 topicToConsumeFrom ,调用函数并将结果写入 topicToProduceTo .

现在我的问题:
如果我的功能并不总是产生某些东西,那么正确的处理方式是什么。服务监听主题并忽略消息是一个非常常见的用例。它应该只对特定消息使用react,然后才产生输出,否则什么也不做。我通过创建一个不同的生产者函数来尝试这一点,只有在适用的情况下我才会从消费者那里调用它:
这是我改编的 application.yml现在定义了两个不同的函数 consumeproduce :
spring:
cloud:
stream:
bindings:
consume-in-0:
destination: topicToConsumeFrom
produce-out-0:
destination: topicToProduceTo
kafka:
binder:
brokers: ${kafka.broker.prod}
default:
group: ${spring.application.name}
function:
definition: consume;produce
在我的应用程序中,我重命名了 consumeAndProduce()consume()现在返回值是 Unit .另外,我创建了第二个 bean produce()除了返回一个返回我给它的有效负载的函数之外什么都不做:
@SpringBootApplication
@ConfigurationPropertiesScan
class Application {
companion object {
@JvmStatic
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
}

@Bean
fun consume(produce: (ProducerEvent) -> ProducerEvent): (ConsumerEvent) -> Unit {
return { message ->
if (isSomethingIWantToReactTo(message)) {
val result = doSomethingAndReturnProduceEvent(message)
producer(result)
}
}
}

@Bean
fun produce(): (ProducerEvent) -> ProducerEvent {
return { it }
}
}
现在 consume()如果主题中存在消息并且消息是给我的,则调用 -function,我会做一些事情并调用 produce() -对我的结果起作用,否则什么也不做。
我无法在传出主题中看到任何消息。
我通过调试知道它进入了 if 分支并调用了 produce()以我的结果,但似乎卡夫卡绑定(bind)不起作用,并且没有真正发送任何消息。
我知道,我的方法有点幼稚,但经过广泛的研究,我找不到任何描述这个用例的东西。所有的例子总是有一个在所有情况下消费和产生的函数。
有正确的方法吗?

最佳答案

基于仅使用一个函数的第一个代码示例,有两种可能的解决方案 consumeAndProduce()消费和生产:

  • 返回一个可为空的事件。如果函数返回 null ,不会产生任何事件:
  • @Bean
    fun consumeAndProduce(): (ConsumerEvent) -> ProducerEvent? {
    return { message ->
    if(isSomethingIWantToReactTo(message)) {
    doSomethingAndReturnProducerEvent(message)
    }
    null
    }
    }
  • 使用KStream s 消费一个事件流并产生一个事件流。这样,任何不适用的传入事件都可以被简单地过滤掉:
  • @Bean
    fun consumeAndProduce(): (KStream<ConsumerEvent>) -> KStream<ProducerEvent> {
    return { messages ->
    messages
    .filter(message -> isSomethingIWantToReactTo(message))
    .map(message -> doSomethingAndReturnProducerEvent(message))
    }
    }
    注:我没有遵循这种方法,也没有尝试过,因此建议对 KStreams 进行研究。

    关于kotlin - 如何使用 spring-cloud-function 使用 Kafka-events,但有时会产生,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63155006/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com