- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我想创建使用安全协议(protocol) SASL_SSL 和 sasl Merchanism PLAIN 的 kafka 消费者。有人可以帮我配置这些详细信息吗?
我已经阅读了许多有关如何配置 SASL 详细信息的文档,但仍然没有清楚地了解如何做到这一点。这里我附上我用来创建kafka消费者的代码
Properties props = new Properties();
props.put("bootstrap.servers", "servers");
String consumeGroup = "consumer_group";
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\"");
props.put("group.id", consumeGroup);
props.put("client.id", "client_id");
props.put("security.protocol", "SASL_SSL");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "101");
props.put("max.partition.fetch.bytes", "135");
// props.put("auto.offset.reset", "earliest");
props.put("heartbeat.interval.ms", "3000");
props.put("session.timeout.ms", "6001");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(props);
堆栈跟踪
14:56:12.767 [main] DEBUG o.a.k.clients.consumer.KafkaConsumer - Starting the Kafka consumer
14:56:12.776 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(nodes = [Node(-2, kafka-events-nonprod-ds1.i, 9092), Node(-3, kafka-events-nonprod-ds1-3.io, 9092), Node(-1, kafka-events-nonprod-ds1-1.io, 9092)], partitions = [])
14:56:12.789 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name connections-closed:client-id-client_id
14:56:12.845 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name connections-created:client-id-client_id
14:56:12.846 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received:client-id-client_id
14:56:12.846 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name bytes-sent:client-id-client_id
14:56:12.847 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name bytes-received:client-id-client_id
14:56:12.847 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name select-time:client-id-client_id
14:56:12.847 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name io-time:client-id-client_id
14:56:12.861 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name heartbeat-latency
14:56:12.862 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name join-latency
14:56:12.862 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name sync-latency
14:56:12.865 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name commit-latency
14:56:12.873 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name bytes-fetched
14:56:12.874 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name records-fetched
14:56:12.879 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name fetch-latency
14:56:12.881 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name records-lag
14:56:12.882 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name fetch-throttle-time
14:56:12.883 [main] WARN o.a.k.c.consumer.ConsumerConfig - The configuration sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password" was supplied but isn't a known config.
14:56:12.885 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0
14:56:12.885 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a
14:56:12.886 [main] DEBUG o.a.k.clients.consumer.KafkaConsumer - Kafka consumer created
14:56:12.887 [main] DEBUG o.a.k.clients.consumer.KafkaConsumer - Subscribed to topic(s): topic_name
14:56:12.887 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Issuing group metadata request to broker -2
14:56:12.918 [main] DEBUG o.apache.kafka.clients.NetworkClient - Initiating connection to node -2 at kafka-events-nonprod-ds1.i:9092.
14:56:13.336 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node--2.bytes-sent
14:56:13.336 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node--2.bytes-received
14:56:13.337 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node--2.latency
14:56:13.339 [main] DEBUG o.apache.kafka.clients.NetworkClient - Completed connection to node -2
14:56:13.343 [main] DEBUG o.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=1,client_id=client_id}, body={topics=[topic_name]}), isInitiatedByNetworkClient, createdTimeMs=1568193973342, sendTimeMs=0) to node -2
14:56:13.986 [main] DEBUG o.a.kafka.common.network.Selector - Connection with kafka-events-nonprod-ds1-2.octanner.i/10.84.20.85 disconnected
java.io.EOFException: null
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99) ~[kafka-clients-0.9.0.0.jar:na]
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) ~[kafka-clients-0.9.0.0.jar:na]
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:160) ~[kafka-clients-0.9.0.0.jar:na]
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:141) ~[kafka-clients-0.9.0.0.jar:na]
at org.apache.kafka.common.network.Selector.poll(Selector.java:286) ~[kafka-clients-0.9.0.0.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270) [kafka-clients-0.9.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303) [kafka-clients-0.9.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197) [kafka-clients-0.9.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187) [kafka-clients-0.9.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:126) [kafka-clients-0.9.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:186) [kafka-clients-0.9.0.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:857) [kafka-clients-0.9.0.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:829) [kafka-clients-0.9.0.0.jar:na]
at kafka.Consumer.processRecords(Consumer.java:54) [classes/:na]
at kafka.Consumer.execute(Consumer.java:22) [classes/:na]
at kafka.Consumer.main(Consumer.java:15) [classes/:na]
反序列化函数:
private static void processRecords(KafkaConsumer<String, Object> consumer) throws InterruptedException {
while (true) {
ConsumerRecords<String, Object> records = consumer.poll(TimeUnit.MINUTES.toMillis(1));
long lastOffset = 0;
for (ConsumerRecord<String, Object> record : records) {
System.out.printf("\n\n\n\n\n\n\roffset = %d, key = %s\n\n\n\n\n\n", record.offset(), record.value());
lastOffset = record.offset();
}
System.out.println("lastOffset read: " + lastOffset);
process();
}
}
最佳答案
Kafka 0.10 增加了对 Plain 机制的支持。您使用的Kafka 0.9版本仅支持GSSAPI机制。
切换到更新版本后,您只需要至少设置以下配置:
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, <BROKERS>);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";");
请注意,Kafka 0.10.2 中添加了 SaslConfigs.SASL_JAAS_CONFIG
支持。在此之前,您需要使用 JAAS 文件。请参阅Kafka "Login module not specified in JAAS config"了解详情。
如果可能的话,我建议您开始使用最新的 Kafka 版本。
关于java - 如何在java中使用sasl机制PLAIN和安全协议(protocol)SASL_SSL配置kafka消费者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57886163/
我正在学习 Spring 安全性,但我对它的灵活性感到困惑.. 我知道我可以通过在标签中定义规则来保护网址 然后我看到有一个@secure 注释可以保护方法。 然后还有其他注释来保护域(或 POJO)
假设有一个 key 加密 key 位于内存中并且未写入文件或数据库... byte[] kek = new byte[32]; secureRandom.nextBytes(kek); byte[]
我有 Spring Security 3.2.0 RC1 的问题 我正在使用标签来连接我 这表示“方法‘setF
我正在创建一个使用 Node Js 服务器 API 的 Flutter 应用程序。对于授权,我决定将 JWT 与私钥/公钥一起使用。服务器和移动客户端之间的通信使用 HTTPS。 Flutter 应用
在过去的几年里,我一直在使用范围从 Raphael.js 的 javascript 库。至 D3 ,我已经为自己的教育操纵了来自网络各地的动画。我已经从各种 git 存储库下载了 js 脚本,例如 s
在 python 中实现身份验证的好方法是什么?已经存在的东西也很好。我需要它通过不受信任的网络连接进行身份验证。它不需要太高级,只要足以安全地获取通用密码即可。我查看了 ssl 模块。但那个模块让我
我正在尝试学习“如何在 Hadoop 中实现 Kerberos?”我已经看过这个文档 https://issues.apache.org/jira/browse/HADOOP-4487我还了解了基本的
我有一个带有 apache2、php、mysql 的生产服务器。我现在只有一个站点 (mysite.com) 作为虚拟主机。我想把 phpmyadmin、webalizer 和 webmin 放在那里
前些天在网上看到防火墙软件OPNsense,对其有了兴趣,以前写过一个其前面的一个软件M0n0wall( 关于m0n0wa
我在 Spring Boot 和 oauth2(由 Google 提供)上编写了 rest 后端,在 "/login" 上自动重定向。除了 web 的 oauth 之外,我还想在移动后端进行 Fire
我想调用类 Foo,它的构造函数中有抽象类 Base。我希望能够从派生自 Base 的 Derived 调用 Foo 并使用 Derived覆盖方法而不是 Base 的方法。 我只能按照指示使用原始指
如何提高 session 的安全性? $this->session->userdata('userid') 我一直在为我的 ajax 调用扔掉这个小坏蛋。有些情况我没有。然后我想,使用 DOM 中的
我目前正在为某些人提供程序集编译服务。他们可以在在线编辑器中输入汇编代码并进行编译。然后编译它时,代码通过ajax请求发送到我的服务器,编译并返回程序的输出。 但是,我想知道我可以做些什么来防止对服务
就目前而言,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引起辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the he
目前,我通过将 session 中的 key 与 MySQl 数据库中的相同 key 相匹配来验证用户 session 。我使用随机数重新生成 session ,该随机数在每个页面加载时都受 MD5
Microsoft 模式与实践团队提供了一个很棒的 pdf,称为:“构建安全的 asp.net 应用程序”。 microsoft pdf 由于它是为 .Net 1.0 编写的,所以现在有点旧了。有谁知
在 Lua 中,通常会使用 math.random 生成随机值和/或字符串。 & math.randomseed , 其中 os.time用于 math.randomseed . 然而,这种方法有一个
就目前而言,这个问题不适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
我们有一个严重依赖 Ajax 的应用程序。确保对服务器端脚本的请求不是通过独立程序而是通过坐在浏览器上的实际用户的好方法是什么 最佳答案 真的没有。 通过浏览器发送的任何请求都可以由独立程序伪造。 归
我正在寻找使用 WebSockets 与我们的服务器通信来实现 web (angular) 和 iPhone 应用程序。在过去使用 HTTP 请求时,我们使用请求数据、url、时间戳等的哈希值来验证和
我是一名优秀的程序员,十分优秀!