- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我将从RabbitMQ切换到Kafka。这只是了解Kafka如何运作的简单提示。我不确定是否缺少我的设置,是否是我的代码,是否是Kafka-Go,或者这是否是预期的Kafka行为。
我尝试过调整BatchSize
和BatchTimeout
,但都没有影响。
下面的代码创建一个具有6个分区和3的复制因子的主题。然后,每个100ms
都产生一个递增的消息。它启动6个使用者,每个分区一个。读取和写入都在go例程中执行。
在下面的日志中,它持续7秒钟没有收到消息,然后接收到突发。我正在使用Confluent的平台,因此我认识到会有一些网络延迟,但程度不如我所见。
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"log"
"net"
"strconv"
"time"
kafka "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
)
func newDialer(clientID, username, password string) *kafka.Dialer {
mechanism := plain.Mechanism{
Username: username,
Password: password,
}
rootCAs, _ := x509.SystemCertPool()
if rootCAs == nil {
rootCAs = x509.NewCertPool()
}
return &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
ClientID: clientID,
SASLMechanism: mechanism,
TLS: &tls.Config{
InsecureSkipVerify: false,
RootCAs: rootCAs,
},
}
}
func createTopic(url string, topic string, dialer *kafka.Dialer) {
conn, err := dialer.Dial("tcp", url)
if err != nil {
panic(err.Error())
}
defer conn.Close()
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
var controllerConn *kafka.Conn
controllerConn, err = dialer.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer controllerConn.Close()
topicConfigs := []kafka.TopicConfig{
{
Topic: topic,
NumPartitions: 6,
ReplicationFactor: 3,
},
}
err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
panic(err.Error())
}
}
func newWriter(url string, topic string, dialer *kafka.Dialer) *kafka.Writer {
return kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{url},
Topic: topic,
Balancer: &kafka.CRC32Balancer{},
Dialer: dialer,
BatchSize: 10,
BatchTimeout: 1 * time.Millisecond,
})
}
func newReader(url string, topic string, partition int, dialer *kafka.Dialer) *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{url},
Topic: topic,
Dialer: dialer,
Partition: partition,
})
}
func read(url string, topic string, dialer *kafka.Dialer, partition int) {
reader := newReader(url, topic, partition, dialer)
defer reader.Close()
for {
msg, err := reader.ReadMessage(context.Background())
if err != nil {
panic(err)
}
log.Printf("rec%d:\t%s\n", partition, msg.Value)
}
}
func write(url string, topic string, dialer *kafka.Dialer) {
writer := newWriter(url, topic, dialer)
defer writer.Close()
for i := 0; ; i++ {
v := []byte("V" + strconv.Itoa(i))
log.Printf("send:\t%s\n", v)
msg := kafka.Message{ Key: v, Value: v }
err := writer.WriteMessages(context.Background(), msg)
if err != nil {
fmt.Println(err)
}
time.Sleep(100 * time.Millisecond)
}
}
func main() {
url := "_______.______.___.confluent.cloud:9092"
topic := "test"
username := "________________"
password := "________________"
clientID := "________________"
dialer := newDialer(clientID, username, password)
ctx := context.Background()
createTopic(url, topic, dialer)
for i := 0; i < 6; i++ {
go read(url, topic, dialer, i)
}
go write(url, topic, dialer)
<-ctx.Done()
}
正在记录以下内容。
2020/11/02 23:19:22 send: V0
2020/11/02 23:19:23 send: V1
2020/11/02 23:19:23 send: V2
2020/11/02 23:19:23 send: V3
2020/11/02 23:19:24 send: V4
2020/11/02 23:19:24 send: V5
2020/11/02 23:19:24 send: V6
2020/11/02 23:19:25 send: V7
2020/11/02 23:19:25 send: V8
2020/11/02 23:19:25 send: V9
2020/11/02 23:19:25 send: V10
2020/11/02 23:19:26 send: V11
2020/11/02 23:19:26 send: V12
2020/11/02 23:19:26 send: V13
2020/11/02 23:19:26 send: V14
2020/11/02 23:19:26 send: V15
2020/11/02 23:19:27 send: V16
2020/11/02 23:19:27 send: V17
2020/11/02 23:19:27 send: V18
2020/11/02 23:19:27 send: V19
2020/11/02 23:19:28 send: V20
2020/11/02 23:19:29 send: V21
2020/11/02 23:19:29 send: V22
2020/11/02 23:19:29 send: V23
2020/11/02 23:19:29 send: V24
2020/11/02 23:19:29 send: V25
2020/11/02 23:19:30 send: V26
2020/11/02 23:19:30 send: V27
2020/11/02 23:19:30 send: V28
2020/11/02 23:19:30 send: V29
2020/11/02 23:19:31 send: V30
2020/11/02 23:19:31 send: V31
2020/11/02 23:19:31 send: V32
2020/11/02 23:19:32 send: V33
2020/11/02 23:19:32 send: V34
2020/11/02 23:19:32 rec3: V8
2020/11/02 23:19:32 rec3: V14
2020/11/02 23:19:32 rec3: V15
2020/11/02 23:19:32 rec3: V16
2020/11/02 23:19:32 rec3: V17
2020/11/02 23:19:32 rec3: V20
2020/11/02 23:19:32 rec3: V21
2020/11/02 23:19:32 rec3: V23
2020/11/02 23:19:32 rec3: V29
2020/11/02 23:19:32 rec1: V0
2020/11/02 23:19:32 rec1: V9
2020/11/02 23:19:32 rec1: V22
2020/11/02 23:19:32 rec1: V28
2020/11/02 23:19:32 rec4: V4
2020/11/02 23:19:32 rec4: V5
2020/11/02 23:19:32 rec4: V7
2020/11/02 23:19:32 rec4: V10
2020/11/02 23:19:32 rec4: V11
2020/11/02 23:19:32 rec4: V12
2020/11/02 23:19:32 rec4: V18
2020/11/02 23:19:32 rec4: V24
2020/11/02 23:19:32 rec4: V25
2020/11/02 23:19:32 rec4: V30
2020/11/02 23:19:32 rec4: V31
2020/11/02 23:19:32 send: V35
2020/11/02 23:19:32 rec5: V1
2020/11/02 23:19:32 rec5: V2
2020/11/02 23:19:32 rec5: V3
2020/11/02 23:19:32 rec5: V34
2020/11/02 23:19:32 rec2: V6
2020/11/02 23:19:32 rec2: V13
2020/11/02 23:19:32 rec2: V26
2020/11/02 23:19:32 rec2: V33
2020/11/02 23:19:32 send: V36
2020/11/02 23:19:33 send: V37
2020/11/02 23:19:33 send: V38
2020/11/02 23:19:33 send: V39
2020/11/02 23:19:33 send: V40
2020/11/02 23:19:33 send: V41
2020/11/02 23:19:33 rec0: V19
2020/11/02 23:19:33 rec0: V27
2020/11/02 23:19:33 rec0: V32
2020/11/02 23:19:34 send: V42
2020/11/02 23:19:34 send: V43
2020/11/02 23:19:34 send: V44
2020/11/02 23:19:34 send: V45
2020/11/02 23:19:34 send: V46
2020/11/02 23:19:35 send: V47
2020/11/02 23:19:35 send: V48
2020/11/02 23:19:35 send: V49
2020/11/02 23:19:35 send: V50
2020/11/02 23:19:35 send: V51
2020/11/02 23:19:35 send: V52
2020/11/02 23:19:36 send: V53
2020/11/02 23:19:36 send: V54
2020/11/02 23:19:36 send: V55
2020/11/02 23:19:36 send: V56
2020/11/02 23:19:36 send: V57
2020/11/02 23:19:37 send: V58
2020/11/02 23:19:37 send: V59
2020/11/02 23:19:37 send: V60
2020/11/02 23:19:38 send: V61
2020/11/02 23:19:38 send: V62
2020/11/02 23:19:38 send: V63
2020/11/02 23:19:38 send: V64
2020/11/02 23:19:38 send: V65
2020/11/02 23:19:39 send: V66
2020/11/02 23:19:39 send: V67
2020/11/02 23:19:39 send: V68
2020/11/02 23:19:40 send: V69
2020/11/02 23:19:40 send: V70
2020/11/02 23:19:40 send: V71
2020/11/02 23:19:40 send: V72
2020/11/02 23:19:40 send: V73
2020/11/02 23:19:40 send: V74
2020/11/02 23:19:41 send: V75
2020/11/02 23:19:41 send: V76
2020/11/02 23:19:41 rec1: V41
2020/11/02 23:19:41 rec1: V56
2020/11/02 23:19:41 rec1: V68
2020/11/02 23:19:41 rec1: V74
2020/11/02 23:19:41 rec1: V75
2020/11/02 23:19:41 rec1: V76
2020/11/02 23:19:41 rec3: V37
2020/11/02 23:19:41 rec3: V40
2020/11/02 23:19:41 rec3: V42
2020/11/02 23:19:41 rec3: V48
2020/11/02 23:19:41 rec3: V55
2020/11/02 23:19:41 rec3: V57
2020/11/02 23:19:41 rec3: V60
2020/11/02 23:19:41 rec3: V61
2020/11/02 23:19:41 rec3: V62
2020/11/02 23:19:41 send: V77
2020/11/02 23:19:41 rec4: V38
2020/11/02 23:19:41 rec4: V39
2020/11/02 23:19:41 rec4: V45
2020/11/02 23:19:41 rec4: V46
2020/11/02 23:19:41 rec4: V47
2020/11/02 23:19:41 rec4: V53
2020/11/02 23:19:41 rec4: V59
2020/11/02 23:19:41 rec4: V70
2020/11/02 23:19:41 rec4: V71
2020/11/02 23:19:41 rec4: V73
2020/11/02 23:19:41 rec5: V35
2020/11/02 23:19:41 rec5: V36
2020/11/02 23:19:41 rec5: V43
2020/11/02 23:19:41 rec5: V49
2020/11/02 23:19:41 rec5: V54
2020/11/02 23:19:41 rec5: V63
2020/11/02 23:19:41 rec5: V69
2020/11/02 23:19:41 rec5: V77
2020/11/02 23:19:41 send: V78
2020/11/02 23:19:41 rec2: V44
2020/11/02 23:19:41 rec2: V50
2020/11/02 23:19:41 rec2: V51
2020/11/02 23:19:41 rec2: V64
2020/11/02 23:19:41 rec2: V65
2020/11/02 23:19:41 rec2: V66
2020/11/02 23:19:41 rec2: V72
2020/11/02 23:19:41 send: V79
2020/11/02 23:19:42 send: V80
2020/11/02 23:19:42 send: V81
2020/11/02 23:19:42 send: V82
2020/11/02 23:19:42 send: V83
2020/11/02 23:19:42 send: V84
2020/11/02 23:19:43 send: V85
2020/11/02 23:19:43 rec0: V52
2020/11/02 23:19:43 rec0: V58
2020/11/02 23:19:43 rec0: V67
2020/11/02 23:19:43 send: V86
任何建议将不胜感激。谢谢!
package main
import (
"context"
"fmt"
"github.com/Shopify/sarama"
"crypto/tls"
"crypto/x509"
"log"
"strings"
"time"
)
var (
broker = "___-_____.us-east1.gcp.confluent.cloud:9092"
brokers = []string{broker}
clientID = "___________"
username = "___________"
password = "___________"
topic = "sarama"
)
func main() {
log.Printf("Kafka brokers: %s", strings.Join(brokers, ", "))
ctx := context.Background()
sync := newSyncProducer()
// accessLog := newAsyncProducer()
createTopic(topic)
go func() {
for i := 0; ; i++ {
v := sarama.StringEncoder(fmt.Sprintf("V%d", i))
p, o, err := sync.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Value: v,
})
if err != nil {
panic(err)
}
fmt.Printf("sent\t\t%v\tp: %d\toffset: %d\t\n", v, p, o)
time.Sleep(100 * time.Millisecond)
}
}()
ps := []sarama.PartitionConsumer{}
offset := int64(0)
loop := func(msgs <-chan *sarama.ConsumerMessage) {
for msg := range msgs {
fmt.Printf("recv:\t\t%s\tp: %d\toffset: %d\n", msg.Value, msg.Partition, msg.Offset)
}
}
for i := 0; i < 6; i++ {
ps = append(ps, createPartitionConsumer(topic, int32(i), offset))
}
for _, p := range ps {
go loop(p.Messages())
}
<-ctx.Done()
}
func createPartitionConsumer(topic string, partition int32, offset int64) sarama.PartitionConsumer {
config := baseConfig()
c, err := sarama.NewConsumer(brokers, config)
if err != nil {
panic(err)
}
p, err := c.ConsumePartition(topic, partition, offset)
if err != nil {
panic(err)
}
return p
}
func createTopic(topic string) {
config := baseConfig()
admin, err := sarama.NewClusterAdmin(brokers, config)
if err != nil {
log.Fatal("Error while creating cluster admin: ", err.Error())
}
defer func() { _ = admin.Close() }()
err = admin.CreateTopic(topic, &sarama.TopicDetail{
NumPartitions: 6,
ReplicationFactor: 3,
}, false)
if err != nil {
log.Println("Error while creating topic: ", err.Error())
}
}
func baseConfig() *sarama.Config {
rootCAs, _ := x509.SystemCertPool()
if rootCAs == nil {
rootCAs = x509.NewCertPool()
}
config := sarama.NewConfig()
config.Version = sarama.MaxVersion
config.Net.TLS.Enable = true
config.Net.TLS.Config = &tls.Config{
RootCAs: rootCAs,
InsecureSkipVerify: false,
}
config.ClientID = clientID
config.Net.SASL.Enable = true
config.Net.SASL.Password = password
config.Net.SASL.User = username
return config
}
func newSyncProducer() sarama.SyncProducer {
config := baseConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 10
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
log.Fatalln("Failed to start Sarama producer:", err)
}
return producer
}
实际上,在某些情况下,它实际上是在确认发送之前接收的,这使我想知道是否存在内部消息传递,如果是,我是否应该关心...
recv: V1176 p: 1 offset: 355
recv: V1177 p: 2 offset: 363
send: V1177 p: 2 offset: 363
send: V1178 p: 5 offset: 377
recv: V1178 p: 5 offset: 377
recv: V1179 p: 1 offset: 356
send: V1179 p: 1 offset: 356
send: V1180 p: 1 offset: 357
recv: V1180 p: 1 offset: 357
recv: V1181 p: 1 offset: 358
send: V1181 p: 1 offset: 358
send: V1182 p: 4 offset: 393
recv: V1182 p: 4 offset: 393
send: V1183 p: 4 offset: 394
recv: V1183 p: 4 offset: 394
send: V1184 p: 3 offset: 358
recv: V1184 p: 3 offset: 358
send: V1185 p: 2 offset: 364
recv: V1185 p: 2 offset: 364
send: V1186 p: 3 offset: 359
recv: V1186 p: 3 offset: 359
recv: V1187 p: 3 offset: 360
send: V1187 p: 3 offset: 360
send: V1188 p: 5 offset: 378
recv: V1188 p: 5 offset: 378
send: V1189 p: 2 offset: 365
recv: V1189 p: 2 offset: 365
recv: V1190 p: 4 offset: 395
send: V1190 p: 4 offset: 395
send: V1191 p: 1 offset: 359
recv: V1191 p: 1 offset: 359
send: V1192 p: 4 offset: 396
recv: V1192 p: 4 offset: 396
send: V1193 p: 0 offset: 431
recv: V1193 p: 0 offset: 431
send: V1194 p: 4 offset: 397
recv: V1194 p: 4 offset: 397
recv: V1195 p: 2 offset: 366
send: V1195 p: 2 offset: 366
send: V1196 p: 3 offset: 361
recv: V1196 p: 3 offset: 361
最佳答案
您需要更改ReaderConfig.MinBytes
,否则segmentio/kafka-go
会将其设置为1e6 = 1 MB
,在这种情况下,Kafka将等待积累这么多的数据,然后再回答请求。
func newReader(url string, topic string, partition int, dialer *kafka.Dialer) *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{url},
Topic: topic,
Dialer: dialer,
Partition: partition,
MinBytes: 1, // same value of Shopify/sarama
MaxBytes: 57671680,
})
}
另一方面,
shopify/sarama
的默认值为1个字节。
关于go - 使用Kafka-Go,为什么我看到似乎是批量读取/写入的内容?我是否缺少配置?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64656638/
我在 Windows 机器上启动 Kafka-Server 时出现以下错误。我已经从以下链接下载了 Scala 2.11 - kafka_2.11-2.1.0.tgz:https://kafka.ap
关于Apache-Kafka messaging queue . 我已经从 Kafka 下载页面下载了 Apache Kafka。我已将其提取到 /opt/apache/installed/kafka
假设我有 Kafka 主题 cars。 我还有一个消费者组 cars-consumers 订阅了 cars 主题。 cars-consumers 消费者组当前位于偏移量 89。 当我现在删除 cars
我想知道什么最适合我:Kafka 流或 Kafka 消费者 api 或 Kafka 连接? 我想从主题中读取数据,然后进行一些处理并写入数据库。所以我编写了消费者,但我觉得我可以编写 Kafka 流应
我曾研究过一些 Kafka 流应用程序和 Kafka 消费者应用程序。最后,Kafka流不过是消费来自Kafka的实时事件的消费者。因此,我无法弄清楚何时使用 Kafka 流或为什么我们应该使用
Kafka Acknowledgement 和 Kafka 消费者 commitSync() 有什么区别 两者都用于手动偏移管理,并希望两者同步工作。 请协助 最佳答案 使用 spring-kafka
如何在 Kafka 代理上代理 Apache Kafka 生产者请求,并重定向到单独的 Kafka 集群? 在我的特定情况下,无法更新写入此集群的客户端。这意味着,执行以下操作是不可行的: 更新客户端
我需要在 Kafka 10 中命名我的消费者,就像我在 Kafka 8 中所做的一样,因为我有脚本可以嗅出并进一步使用这些信息。 显然,consumer.id 的默认命名已更改(并且现在还单独显示了
1.概述 我们会看到zk的数据中有一个节点/log_dir_event_notification/,这是一个序列号持久节点 这个节点在kafka中承担的作用是: 当某个Broker上的LogDir出现
我正在使用以下命令: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test.topic --property
我很难理解 Java Spring Boot 中的一些 Kafka 概念。我想针对在服务器上运行的真实 Kafka 代理测试消费者,该服务器有一些生产者已将数据写入/已经将数据写入各种主题。我想与服务
我的场景是我使用了很多共享前缀的 Kafka 主题(例如 house.door, house.room ) 并使用 Kafka 流正则表达式主题模式 API 使用所有主题。 一切看起来都不错,我得到了
有没有办法以编程方式获取kafka集群的版本?例如,使用AdminClient应用程序接口(interface)。 我想在消费者/生产者应用程序中识别 kafka 集群的版本。 最佳答案 目前无法检索
每当我尝试重新启动 kafka 时,它都会出现以下错误。一旦我删除/tmp/kafka-logs 它就会得到解决,但它也会删除我的主题。 有办法解决吗? ERROR Error while
我是 Apache Kafka 的新用户,我仍在了解内部结构。 在我的用例中,我需要从 Kafka Producer 客户端动态增加主题的分区数。 我发现了其他类似的 questions关于增加分区大
正如 Kafka 文档所示,一种方法是通过 kafka.tools.MirrorMaker 来实现这一点。但是,我需要将一个主题(比如 测试 带 1 个分区)(其内容和元数据)从生产环境复制到没有连接
我已经在集群中配置了 3 个 kafka,我正在尝试与 spring-kafka 一起使用。 但是在我杀死 kafka 领导者之后,我无法将其他消息发送到队列中。 我将 spring.kafka.bo
我的 kafka sink 连接器从多个主题(配置了 10 个任务)读取,并处理来自所有主题的 300 条记录。根据每个记录中保存的信息,连接器可以执行某些操作。 以下是触发器记录中键值对的示例: "
我有以下 kafka 流代码 public class KafkaStreamHandler implements Processor{ private ProcessorConte
当 kafka-streams 应用程序正在运行并且 Kafka 突然关闭时,应用程序进入“等待”模式,发送警告日志的消费者和生产者线程无法连接,当 Kafka 回来时,一切都应该(理论上)去恢复正常
我是一名优秀的程序员,十分优秀!