- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用以下配置:
我为 SpringXD 创建了自定义 Kafka 源模块。我设置了我的消费者逻辑和消息驱动 channel 适配器(我将其与控制总线结合使用来停止我的 channel 适配器)。到目前为止,一切都很好。另外,我还使用 max.poll.record=10
作为 kafka 属性来每次轮询获取 10 条记录。
我想确保在成功获取所有记录(在本例中为 10 条记录)后立即停止我的 channel 。
例如:我想避免在并非所有记录都已成功获取和处理时(即,当记录未发送到输出 channel 时)停止读取。
有办法告诉我吗?
这是我的 xml 配置,以防万一:
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<int:channel id="input_to_control_bus" />
<int:channel id="output" />
<context:component-scan base-package="com.kafka.source.logic" />
<int:control-bus id="my_control_bus" input-channel="input_to_control_bus" />
<int-kafka:message-driven-channel-adapter
id="kafkaInboundChannelAdapterTesting" listener-container="container1"
auto-startup="false" phase="100" send-timeout="5000" channel="output"
mode="record" message-converter="messageConverter" />
<bean id="messageConverter"
class="org.springframework.kafka.support.converter.MessagingMessageConverter" />
<!--Consumer -->
<bean id="container1"
class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
<entry key="enable.auto.commit" value="false" />
<entry key="auto.commit.interval.ms" value="100" />
<entry key="session.timeout.ms" value="15000" />
<entry key="max.poll.records" value="3" />
<entry key="group.id" value="bridge-stream-testing" />
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.IntegerDeserializer" />
<entry key="value.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="testing-topic" />
</bean>
</constructor-arg>
</bean>
[更新 1]我为什么要这样做?详细信息如下:
max.poll.records
来确保每次轮询最多提取 X 条消息。这些是有关此场景的一些详细信息。还有更多场景,但我不想使用相同的 SO 问题来混合它。
[更新 N°2]
Artem 回答后的一些想法。
max.poll.records
,只是等到达到 Y 分钟并计数 X,会发生什么情况消息,然后停止
channel ?启动
channel 时会读取那些无法读取的消息?我想避免将消息保留在内存中,这就是我使用message-driven-channel-adapter
+ max.poll.records
最佳答案
我可以建议的是 AtomicInteger
bean,它会在每个处理的记录上增加,当您达到阈值时,您将执行 stop()
为您kafkaInboundChannelAdapterTesting
.
关于java - Spring集成和Kafka消费者: Stop message-driven-channel-adapter right after records are sucessfully fetched,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43358831/
recorder = new MediaRecorder(); recorder.setAudioSource(MediaRecorder.AudioSource.VOICE_CALL
在我的表中,我有四列。 r_id id(用户 ID) v_id(公司 ID) 率 我所做的就是从用户那里对公司(v_id)进行评分。 假设,如果用户一对第一家公司(v_id)进行评分,那么当同一用户对
我的表中有 10 条记录,当我删除记录 5 并创建新记录时,新记录将取代已删除的记录,如下所示: 在删除之前记录其ID: 1个2个3个4个5个6个78个910 删除记录 5 并插入新记录后,id 为
我有两个 belongsToMany 模型: const apptsModel = db.define('Appts', { id: {type: Sequelize.INTEGER, pri
有没有办法在 iOS Playground 上使用录音机?运行时,它会询问我是否允许使用我的麦克风,并且录音机实例似乎可以正常工作,但是,我无法使用 record() 函数。我当前的环境是 iOS 1
我有 2 个表,状态表与配置文件表相连。 Profile 表存储唯一的 tagId,status 表存储多个重复的 tagid 条目。我想按tagid的最后一条记录和第一条记录分组显示。 表:简介注意
我是 的新手 typescript 我需要遍历 Record键入对值进行一些更新并返回 Record . 这是定义类型的方式: type Parent = Readonly>; type Childr
我发现在开发 extjs 应用程序(拉力赛应用程序)时,有时我需要从记录中获取的数据在 record.raw 中而不是在 record.data 中。两者有什么区别,为什么会这样? 编辑 - 添加示例
我需要交叉引用 2 个表。 在 tb1 中是 booking_ref,投资者 在 tb2 中是 booking_ref、investor、cost 麻烦的是如果没有成本,表2中没有记录 所以我有以下查
鉴于:我在 Kafka 中有两个主题,假设主题 A 和主题 B。Kafka Stream 从主题 A 读取记录,处理它并生成与消费记录相对应的多条记录(假设记录 A 和记录 B)。现在,问题是如何使用
我有一个包含 6 个元素的排序数组列表。前 5 个元素有一些值,第 6 个元素为空。 我想循环遍历这个ArrayList,并将第一个记录的前5个元素与下一个记录中的相同元素进行比较。如果任一元素不同,
我有一个包含 3 列的表:ID、Name、ParentID。 如何删除特定记录及其所有子记录(n 层深)? 使用 Entity Framework 3.5。 最佳答案 表是自引用的是应用程序逻辑,它没
获取记录时如何获取最新的记录? 例如: 第一次迭代我用对象名称 Country 保存“Singapore”, 第二次迭代我用对象名称 Country,second 保存“USA” 现在当我获取它的时候
我将使用 C# 和 Xamarin 制作的 iOS 应用重写到 Swift,原因很明显是 Xamarin 的定价和低文档。正在关注this tutorial因为在我的 UITableView 上包含一
假设我有一个线束二进制文件,它可以根据命令行选项产生不同的基准。我对采样这些基准非常感兴趣。 我有3个选择: 更改线束二进制文件以生成一个“性能记录”子进程,该子进程运行基准测试并进行采样 只需执行“
什么 SQL 查询会按名称查找记录,以及具有相同地址但可能不同名称的所有记录? 我有一个选民登记数据库,当我查找个人时,我还想看看还有谁在该地址登记。 我的数据库称为 voters,其中的表称为 ex
我在我的 mac 上设置了一个 Tsung 配置,并尝试在我正在开发的网站上记录一个测试。 我跑了:“tsung-recorder start”并在 firefox localhost:8090 中为
我需要将 JOOQ 的记录结果集转换为表记录列表。有什么方法可以做到吗? 最佳答案 是的,您可以使用 Result.into(Table) .一个例子: Result result = create.
我正在尝试将 CKReference 添加到云工具包中的记录,但尝试不断触发“服务记录已更改”。从我的 println 显示的控制台消息(下面的控制台消息和代码), 我正在上传带有 0 个引用的记录,
我有两个类:Artist 和 Instrument。每个 Artist 可以演奏一个或多个 Instrument。每个 Instrument 可以分配给一个或多个 Artist。所以,我设置了以下类:
我是一名优秀的程序员,十分优秀!