- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我需要帮助使用 kafka 生产者将消息发布到主题。
我的 kafka 生产者客户端是用在 spark 上运行的 Scala 编写的。
我的工作运行成功,但我的消息似乎没有发布。
这是代码
val response = info.producer.asInstanceOf[KafkaProducer[K, V]].send(new ProducerRecord(info.props.getProperty(s"$topicNickName.topic"), keyMessage._1, keyMessage._2))
metric.reporters = []
metadata.max.age.ms = 300000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [x.data.edh:6667, y.data.edh:6667, z.data.edh:6667, a.data.edh:6667, b.data.edh:6667]
ssl.keystore.type = JKS
sasl.mechanism = GSSAPI
max.block.ms = 60000
interceptor.classes = null
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
request.timeout.ms = 30000
acks = 1
receive.buffer.bytes = 32768
ssl.truststore.type = JKS
retries = 0
ssl.truststore.location = null
ssl.keystore.password = null
send.buffer.bytes = 131072
compression.type = none
metadata.fetch.timeout.ms = 60000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
batch.size = 16384
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
max.request.size = 1048576
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
linger.ms = 0
最佳答案
以下是如何在 Scala 中向 Kafka 生成消息的示例:
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
val kafkaProducerProps: Properties = {
val props = new Properties()
props.put("bootstrap.servers", "x.data.edh:6667")
props.put("key.serializer", classOf[StringSerializer].getName)
props.put("value.serializer", classOf[StringSerializer].getName)
props
}
val producer = new KafkaProducer[String, String](kafkaProducerProps)
producer.send(new ProducerRecord[String, String]("myTopic", keyMessage._1, keyMessage._2))
Fire-and-forget We send a message to the server and don’t really care if it arrives succesfully or not. Most of the time, it will arrive successfully, since Kafka is highly available and the producer will retry sending messages automatically. However, some messages will get lost using this method.
Synchronous send We send a message, the
send()
method returns a Future object, and we useget()
to wait on the future and see if thesend()
was successful or not.Asynchronous send We call the
send()
method with acallback
function, which gets triggered when it receives a response from the Kafka broker
producer.send(record).get()
producer.send(record, new compareProducerCallback)
producer.flush()
// Callback trait only contains the one abstract method onCompletion
private class compareProducerCallback extends Callback {
@Override
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if (exception != null) {
exception.printStackTrace()
}
}
}
关于scala - 如何在 Scala 中编写 Kafka Producer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61285666/
我有一个生产者/消费者场景,我不希望一个生产者交付产品,也不希望多个消费者消费这些产品。然而,常见的情况是交付的产品仅由一个消费者消费,而其他消费者永远看不到该特定产品。我不想完成的是每个消费者消费一
我正在设计一个系统,其中将有 n 个生产者和 m 个消费者,其中 n 和 m 是数字,n != m。 我想这样设计系统, 任何生产者在生产时不得阻止其他生产者 任何消费者都不应在消费时阻止其他消费者
关于 REST Web 服务。 @Produces("application/json") 和 @Produces(MediaType.APPICATION_JSON) 两者的工作方式相同,但第二个需
我正在尝试使用 Kafka: import java.util.Properties; import org.apache.kafka.clients.producer.Producer; impor
当我使用 Producer.flush() 时,它可以工作,但根据 kafka confluent issue 性能较差,但按照建议,我使用 Producer.poll(0) 但不会向主题生成任何消息
我正在针对 Python 的 confluent-kafka 使用 native java 实现测试 Apache Kafka Producer,以查看哪个具有最大吞吐量。 我正在使用 docker-
我看到 @products 注释允许我传递单个字符串和字符串列表。所以我只是想知道这是如何在java中完成的,如果我需要使用允许以下行为的方法来实现它,我该怎么做?或者这个注释是特定的,所以我们不能在
我正在开发一个迁移学习应用程序,我正在其中针对我的数据流重新训练 MobileNetV2。 我正在使用 retrain.py 重新训练模型来自tensorflow-hub并且没有做任何修改。 当我从终
在 Cloud Foundry 中,我能够向非 ssl url(“kafkaURL:9092”)生成消息。但它不适用于 ssl url(“kafkaURL:9093”)。 Kafka 服务器版本 0.
我正在使用 kafka 向消费者发送消息。但是由于某种原因,当我使用 Producer.send(record, new MyProducerCallback()); 向主题发送记录时,该主题的使用者
我正在编写一个演示应用程序来创建一个 Kafka Producer。我创建了一个主题并在 Kafka 上运行了一个生产者和消费者,它似乎正在工作。我正在编写一个 spring 应用程序来创建一个生产者
我在我的项目中使用 spring boot v2.2.4 和 Apache Kafka。 下面是我的pom.xml文件: org.springframewo
我正在尝试使用 java 程序制作 Kafka 生产者。但是当我运行程序时我收到了一些警告,没有任何错误但是生产者没有发送数据并且警告如下所示。 [kafka-producer-network-thr
我正在尝试加载一个简单的文本文件而不是 Kafka 中的标准输入。下载 Kafka 后,我执行了以下步骤: 启动动物园管理员: bin/zookeeper-server-start.sh config
我有一个类,它生成一个 ElasticSearch 客户端以与 @Inject 一起使用 @Produces @ApplicationScoped public Client createClient
对于一个新项目,我们在客户端使用 jQuery 组件,其中之一是 blueImp 文件 uploader 。我们愉快地编写代码,在 Chrome 和 Firefox 中一切都运行良好……直到有人尝试在
我有一些开发要做,我尝试看看是否有可以使用的设计模式。问题很简单: 我有一个启动许多线程的主线程。主线程必须等待每个线程完成然后再做其他事情。现有的代码有点难看。我有一个 while 循环来检查线程组
我正在使用驱动对象模型工具 CodeFluentEntities以便将模型部署到数据库引擎。 我正在考虑使用 localStorage 数据库引擎(如 IndexedDB 或 Web SQL)来为没有
我无法停止 ActiveMQ Producer。 场景是:我为内存使用和临时存储设置了较低的值。
我正在尝试结合使用 CDI (weld-se 2) 和 JavaFX,并且我想使用自定义创建的注释来注释我的 Controller 类,以便使用我的工厂方法管理此类创建。我想应该如下所示,但这段代码不
我是一名优秀的程序员,十分优秀!