- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在将 spring-integration-kafka 从 1.0.0.M2 升级到 2.1.0.RELEASE,并将客户端 0.9.0 升级到 0.10.0
当前xml配置如下
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int:publish-subscribe-channel id="inputToKafka" />
<int-kafka:outbound-channel-adapter
kafka-producer-context-ref="kafkaProducerContext" auto-startup="true"
channel="inputToKafka" order="1">
</int-kafka:outbound-channel-adapter>
<bean id="producerProperties"
class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="properties">
<props>
<prop key="topic.metadata.refresh.interval.ms">${topic.metadata.refresh.interval.ms}</prop>
<prop key="message.send.max.retries">${message.send.max.retries}</prop>
<prop key="send.buffer.bytes">${send.buffer.bytes}</prop>
</props>
</property>
</bean>
<bean id="fcmNotificationEncoder"
class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder">
<constructor-arg value="common.vo.NotificationVo" />
</bean>
<int-kafka:producer-context id="kafkaProducerContext"
producer-properties="producerProperties">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration
broker-list="${kafka.servers}" key-class-type="java.lang.String"
value-class-type="common.vo.fcmNotificationVo"
value-encoder="fcmNotificationEncoder" topic="trigger-fcm-notification"
compression-codec="none" />
</int-kafka:producer-configurations>
</int-kafka:producer-context>
<int:service-activator input-channel="ip-chanel-trigger-fcm-notification"
ref="fcmNotificationConsumer">
</int:service-activator>
<bean id="consumerProperties"
class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="properties">
<props>
<prop key="auto.offset.reset">${auto.offset.reset}</prop>
<prop key="socket.receive.buffer.bytes">${socket.receive.buffer.bytes}</prop> <!-- 10M -->
<prop key="fetch.message.max.bytes">${fetch.message.max.bytes}</prop>
<prop key="auto.commit.interval.ms">${auto.commit.interval.ms}</prop>
</props>
</property>
</bean>
<int-kafka:zookeeper-connect id="zookeeperConnect"
zk-connect="${zookeeper.servers}" zk-connection-timeout="${zookeeper.connection.timeout}"
zk-session-timeout="${zookeeper.session.timeout}" zk-sync-time="${zookeeper.sync.time}" />
<bean id="kafkaThreadListener" class="api.utils.KafkaConsumerStarter"
init-method="initIt" destroy-method="cleanUp" />
<int-kafka:inbound-channel-adapter
kafka-consumer-context-ref="consumerContextFCM" auto-startup="false"
channel="ip-chanel-trigger-fcm-notification" id="kafka-inbound-channel-adapter-FCM">
<int:poller fixed-delay="1000" time-unit="MILLISECONDS"
receive-timeout="0" />
</int-kafka:inbound-channel-adapter>
<!-- Consumer -->
<bean id="fcmNotificationDecoder"
class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaDecoder">
<constructor-arg value="common.vo.NotificationVo" />
</bean>
<int-kafka:consumer-context id="consumerContextFCM"
consumer-timeout="4000" zookeeper-connect="zookeeperConnect"
consumer-properties="consumerProperties">
<int-kafka:consumer-configurations>
<int-kafka:consumer-configuration
group-id="trigger-fcm-notification" max-messages="50"
value-decoder="fcmNotificationDecoder">
<int-kafka:topic id="trigger-fcm-notification"
streams="10" />
</int-kafka:consumer-configuration>
</int-kafka:consumer-configurations>
</int-kafka:consumer-context>
</beans>
如何将其更改为 2.1.0.RELEASE ?
~~~~~~~~~~~~~~~
在此编辑:
使用引用根据我的要求修改了 xml。我在阅读消费者记录时遇到了一个小问题。我得到了如下有效载荷
{
kafka_offset=7,
kafka_receivedMessageKey=null,
kafka_receivedPartitionId=0,
kafka_receivedTopic=trigger-fcm-notification,
kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = trigger-fcm-notification, partition = 0, offset = 7, CreateTime = 1476864644264, checksum = 3680317883, serialized key size = -1, serialized value size = 270, key = null, value = common.vo.NotificationVo@203c8c95)
}
我需要值 (NotificationVo) 以便在消费者中进一步使用。如何将其作为有效负载的一部分?
~~~~~~~~~~~~~~~
在此编辑:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int:publish-subscribe-channel id="inputToKafka" />
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter" kafka-template="template"
auto-startup="true" channel="inputToKafka" topic="trigger-fcm-notification"
order="1">
<int-kafka:request-handler-advice-chain>
<bean
class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice" />
</int-kafka:request-handler-advice-chain>
</int-kafka:outbound-channel-adapter>
<!--Producer-->
<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
<entry key="retries" value="5" />
<entry key="batch.size" value="16384" />
<entry key="linger.ms" value="1" />
<entry key="buffer.memory" value="33554432" />
<entry key="key.serializer"
value="org.apache.kafka.common.serialization.StringSerializer" />
<entry key="value.serializer"
value="common.vo.NotificationVoSerializer" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
<int-kafka:message-driven-channel-adapter
id="kafka-inbound-channel-adapter-FCM" listener-container="container1"
auto-startup="true" phase="100" send-timeout="5000"
channel="ip-chanel-trigger-fcm-notification" mode="record"
message-converter="messageConverter" />
<bean id="messageConverter"
class="org.springframework.kafka.support.converter.MessagingMessageConverter" />
<int:service-activator input-channel="ip-chanel-trigger-fcm-notification"
ref="fcmNotificationConsumer">
</int:service-activator>
<!--Consumer-->
<bean id="container1"
class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
<entry key="enable.auto.commit" value="false" />
<entry key="auto.commit.interval.ms" value="100" />
<entry key="session.timeout.ms" value="15000" />
<entry key="group.id" value="trigger-fcm-notification" />
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
<entry key="value.deserializer"
value="common.vo.NotificationVoDeserializer" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="trigger-fcm-notification" />
</bean>
</constructor-arg>
</bean>
</beans>
这是修改后的xml配置文件
~~~~~~~~~~~~~~~
在此编辑:
消费类:
package common.notification.consumer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import common.vo.NotificationVo;
@Component
public class FcmNotificationConsumer {
@SuppressWarnings("unchecked")
@ServiceActivator
public <K, V> void process(Map<K, V> payload) {
String topic = null;
System.out.println("payload=====>"+payload.toString());
for (K item : payload.keySet()) {
topic = (String) item;
}
Object ackObject = payload.get(topic);
System.out.println("ackObject=====>"+payload.get(topic));
}
}
O/P:
payload=====>{kafka_offset=21, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=trigger-fcm-notification, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = trigger-fcm-notification, partition = 0, offset = 21, CreateTime = 1476887227554, checksum = 222603853, serialized key size = -1, serialized value size = 270, key = null, value = common.vo.NotificationVo@29206bb8)}
ackObject=====>Acknowledgment for ConsumerRecord(topic = trigger-fcm-notification, partition = 0, offset = 21, CreateTime = 1476887227554, checksum = 222603853, serialized key size = -1, serialized value size = 270, key = null, value = common.vo.NotificationVo@29206bb8)
~~~~~~~~~~~~~~~
在此编辑:
在 Consumer 类中更改方法参数后收到预期的负载。
package common.notification.consumer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import common.vo.NotificationVo;
@Component
public class FcmNotificationConsumer {
@SuppressWarnings("unchecked")
@ServiceActivator
public void process(Message<?> message) {
System.out.println("Message=====>"+message);
Object payloadObject = message.getPayload();
NotificationVo notificationVo = (NotificationVo) payloadObject;
}
}
O/P:
Message=====>GenericMessage [payload=common.vo.NotificationVo@4c144e99, headers={kafka_offset=16, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=trigger-fcm-notification, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = trigger-fcm-notification, partition = 0, offset = 16, CreateTime = 1476949945607, checksum = 2501578118, serialized key size = -1, serialized value size = 270, key = null, value = common.vo.NotificationVo@4c144e99)}]
终于按预期工作了。
非常感谢您的支持。
最佳答案
好的。现在比以前干净多了。
因此,您当前的代码适用于 Apache Kafka 0.8。自 0.9 版以来,它具有完全不同的设计。因此必须丢弃 Spring Integration Kafka 1.0 的当前代码。
您应该阅读有关 Apache Kafka 0.10 的信息:https://kafka.apache.org/documentation .
关于Spring Kafka,基于Kafka Client 0.10:http://docs.spring.io/spring-kafka/docs/1.1.1.RELEASE/reference/html/
并注意有关 Spring Integration Kafka 的章节:http://docs.spring.io/spring-kafka/docs/1.1.1.RELEASE/reference/html/_spring_integration.html
请注意:这里没有人会为您工作。
编辑
我不确定你为什么说你有一个 payload
就像那样,因为那正是 headers
.那value
在ConsumerRecord
转换为 payload
的消息。
请分享您收到的代码并尝试提取 value
. <int-kafka:message-driver-channel-adapter>
生产 Message<>
与那些headers
和 payload
来自转换后的 ConsumerRecord
value
.
关于java - 如何使用xml配置升级到spring-integration-kafka 2.1.0.RELEASE?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40109373/
我正在编写一个具有以下签名的 Java 方法。 void Logger(Method method, Object[] args); 如果一个方法(例如 ABC() )调用此方法 Logger,它应该
我是 Java 新手。 我的问题是我的 Java 程序找不到我试图用作的图像文件一个 JButton。 (目前这段代码什么也没做,因为我只是得到了想要的外观第一的)。这是我的主课 代码: packag
好的,今天我在接受采访,我已经编写 Java 代码多年了。采访中说“Java 垃圾收集是一个棘手的问题,我有几个 friend 一直在努力弄清楚。你在这方面做得怎么样?”。她是想骗我吗?还是我的一生都
我的 friend 给了我一个谜语让我解开。它是这样的: There are 100 people. Each one of them, in his turn, does the following
如果我将使用 Java 5 代码的应用程序编译成字节码,生成的 .class 文件是否能够在 Java 1.4 下运行? 如果后者可以工作并且我正在尝试在我的 Java 1.4 应用程序中使用 Jav
有关于why Java doesn't support unsigned types的问题以及一些关于处理无符号类型的问题。我做了一些搜索,似乎 Scala 也不支持无符号数据类型。限制是Java和S
我只是想知道在一个 java 版本中生成的字节码是否可以在其他 java 版本上运行 最佳答案 通常,字节码无需修改即可在 较新 版本的 Java 上运行。它不会在旧版本上运行,除非您使用特殊参数 (
我有一个关于在命令提示符下执行 java 程序的基本问题。 在某些机器上我们需要指定 -cp 。 (类路径)同时执行java程序 (test为java文件名与.class文件存在于同一目录下) jav
我已经阅读 StackOverflow 有一段时间了,现在我才鼓起勇气提出问题。我今年 20 岁,目前在我的家乡(罗马尼亚克卢日-纳波卡)就读 IT 大学。足以介绍:D。 基本上,我有一家提供簿记应用
我有 public JSONObject parseXML(String xml) { JSONObject jsonObject = XML.toJSONObject(xml); r
我已经在 Java 中实现了带有动态类型的简单解释语言。不幸的是我遇到了以下问题。测试时如下代码: def main() { def ks = Map[[1, 2]].keySet()
一直提示输入 1 到 10 的数字 - 结果应将 st、rd、th 和 nd 添加到数字中。编写一个程序,提示用户输入 1 到 10 之间的任意整数,然后以序数形式显示该整数并附加后缀。 public
我有这个 DownloadFile.java 并按预期下载该文件: import java.io.*; import java.net.URL; public class DownloadFile {
我想在 GUI 上添加延迟。我放置了 2 个 for 循环,然后重新绘制了一个标签,但这 2 个 for 循环一个接一个地执行,并且标签被重新绘制到最后一个。 我能做什么? for(int i=0;
我正在对对象 Student 的列表项进行一些测试,但是我更喜欢在 java 类对象中创建硬编码列表,然后从那里提取数据,而不是连接到数据库并在结果集中选择记录。然而,自从我这样做以来已经很长时间了,
我知道对象创建分为三个部分: 声明 实例化 初始化 classA{} classB extends classA{} classA obj = new classB(1,1); 实例化 它必须使用
我有兴趣使用 GPRS 构建车辆跟踪系统。但是,我有一些问题要问以前做过此操作的人: GPRS 是最好的技术吗?人们意识到任何问题吗? 我计划使用 Java/Java EE - 有更好的技术吗? 如果
我可以通过递归方法反转数组,例如:数组={1,2,3,4,5} 数组结果={5,4,3,2,1}但我的结果是相同的数组,我不知道为什么,请帮助我。 public class Recursion { p
有这样的标准方式吗? 包括 Java源代码-测试代码- Ant 或 Maven联合单元持续集成(可能是巡航控制)ClearCase 版本控制工具部署到应用服务器 最后我希望有一个自动构建和集成环境。
我什至不知道这是否可能,我非常怀疑它是否可能,但如果可以,您能告诉我怎么做吗?我只是想知道如何从打印机打印一些文本。 有什么想法吗? 最佳答案 这里有更简单的事情。 import javax.swin
我是一名优秀的程序员,十分优秀!