gpt4 book ai didi

java - Spring集成和Kafka消费者: Stop message-driven-channel-adapter right after records are sucessfully fetched

转载 作者:行者123 更新时间:2023-11-30 06:47:50 25 4
gpt4 key购买 nike

我正在使用以下配置:

  • spring-integration-kafka 2.1.0.RELEASE
  • kafka-clients 0.10.0.1
  • 卡夫卡 0.10.x.x
  • spring-xd-1.3.1.RELEASE

我为 SpringXD 创建了自定义 Kafka 源模块。我设置了我的消费者逻辑和消息驱动 channel 适配器(我将其与控制总线结合使用来停止我的 channel 适配器)。到目前为止,一切都很好。另外,我还使用 max.poll.record=10 作为 kafka 属性来每次轮询获取 10 条记录。

我想确保在成功获取所有记录(在本例中为 10 条记录)后立即停止我的 channel 。

例如:我想避免在并非所有记录都已成功获取和处理时(即,当记录未发送到输出 channel 时)停止读取。

有办法告诉我吗?

这是我的 xml 配置,以防万一:

xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">


<int:channel id="input_to_control_bus" />
<int:channel id="output" />

<context:component-scan base-package="com.kafka.source.logic" />


<int:control-bus id="my_control_bus" input-channel="input_to_control_bus" />

<int-kafka:message-driven-channel-adapter
id="kafkaInboundChannelAdapterTesting" listener-container="container1"
auto-startup="false" phase="100" send-timeout="5000" channel="output"
mode="record" message-converter="messageConverter" />

<bean id="messageConverter"
class="org.springframework.kafka.support.converter.MessagingMessageConverter" />

<!--Consumer -->
<bean id="container1"
class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
<entry key="enable.auto.commit" value="false" />
<entry key="auto.commit.interval.ms" value="100" />
<entry key="session.timeout.ms" value="15000" />
<entry key="max.poll.records" value="3" />
<entry key="group.id" value="bridge-stream-testing" />
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.IntegerDeserializer" />
<entry key="value.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
</map>
</constructor-arg>
</bean>
</constructor-arg>

<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="testing-topic" />
</bean>
</constructor-arg>
</bean>

[更新 1]我为什么要这样做?详细信息如下:

  • 我希望每 Y 分钟最多读取 Kafka 主题的 X 条消息。
  • 我使用 max.poll.records 来确保每次轮询最多提取 X 条消息。
  • 我想要处理的一种情况是:如果在一次特定的消息轮询中,我轮询的消息少于 X,会发生什么情况。这意味着我应该停止 channel 而不等待 X 消息,否则我必须等到将来的消息轮询才能到达这些 X 消息。

这些是有关此场景的一些详细信息。还有更多场景,但我不想使用相同的 SO 问题来混合它。

[更新 N°2]

Artem 回答后的一些想法。

  • 如果我不定义 max.poll.records,只是等到达到 Y 分钟并计数 X,会发生什么情况消息,然后停止 channel ?
  • 是否会因为无法读取而丢失某些消息,或者当我再次启动 channel 时会读取那些无法读取的消息?

我想避免将消息保留在内存中,这就是我使用message-driven-channel-adapter + max.poll.records

的原因>

最佳答案

我可以建议的是 AtomicInteger bean,它会在每个处理的记录上增加,当您达到阈值时,您将执行 stop()为您kafkaInboundChannelAdapterTesting .

关于java - Spring集成和Kafka消费者: Stop message-driven-channel-adapter right after records are sucessfully fetched,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43358831/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com