gpt4 book ai didi

go - 使用Kafka-Go,为什么我看到似乎是批量读取/写入的内容?我是否缺少配置?

转载 作者:行者123 更新时间:2023-12-03 10:01:48 26 4
gpt4 key购买 nike

我将从RabbitMQ切换到Kafka。这只是了解Kafka如何运作的简单提示。我不确定是否缺少我的设置,是否是我的代码,是否是Kafka-Go,或者这是否是预期的Kafka行为。
我尝试过调整BatchSizeBatchTimeout,但都没有影响。
下面的代码创建一个具有6个分区和3的复制因子的主题。然后,每个100ms都产生一个递增的消息。它启动6个使用者,每个分区一个。读取和写入都在go例程中执行。
在下面的日志中,它持续7秒钟没有收到消息,然后接收到突发。我正在使用Confluent的平台,因此我认识到会有一些网络延迟,但程度不如我所见。

package main

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"log"
"net"
"strconv"
"time"

kafka "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
)

func newDialer(clientID, username, password string) *kafka.Dialer {
mechanism := plain.Mechanism{
Username: username,
Password: password,
}

rootCAs, _ := x509.SystemCertPool()
if rootCAs == nil {
rootCAs = x509.NewCertPool()
}

return &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
ClientID: clientID,
SASLMechanism: mechanism,
TLS: &tls.Config{
InsecureSkipVerify: false,
RootCAs: rootCAs,
},
}
}

func createTopic(url string, topic string, dialer *kafka.Dialer) {
conn, err := dialer.Dial("tcp", url)
if err != nil {
panic(err.Error())
}
defer conn.Close()

controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}

var controllerConn *kafka.Conn
controllerConn, err = dialer.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer controllerConn.Close()

topicConfigs := []kafka.TopicConfig{
{
Topic: topic,
NumPartitions: 6,
ReplicationFactor: 3,
},
}

err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
panic(err.Error())
}

}

func newWriter(url string, topic string, dialer *kafka.Dialer) *kafka.Writer {
return kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{url},
Topic: topic,
Balancer: &kafka.CRC32Balancer{},
Dialer: dialer,
BatchSize: 10,
BatchTimeout: 1 * time.Millisecond,
})
}

func newReader(url string, topic string, partition int, dialer *kafka.Dialer) *kafka.Reader {

return kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{url},
Topic: topic,
Dialer: dialer,
Partition: partition,
})
}

func read(url string, topic string, dialer *kafka.Dialer, partition int) {

reader := newReader(url, topic, partition, dialer)
defer reader.Close()
for {
msg, err := reader.ReadMessage(context.Background())
if err != nil {
panic(err)
}
log.Printf("rec%d:\t%s\n", partition, msg.Value)
}
}
func write(url string, topic string, dialer *kafka.Dialer) {
writer := newWriter(url, topic, dialer)
defer writer.Close()
for i := 0; ; i++ {
v := []byte("V" + strconv.Itoa(i))
log.Printf("send:\t%s\n", v)
msg := kafka.Message{ Key: v, Value: v }
err := writer.WriteMessages(context.Background(), msg)
if err != nil {
fmt.Println(err)
}
time.Sleep(100 * time.Millisecond)
}
}

func main() {
url := "_______.______.___.confluent.cloud:9092"
topic := "test"
username := "________________"
password := "________________"
clientID := "________________"
dialer := newDialer(clientID, username, password)
ctx := context.Background()
createTopic(url, topic, dialer)
for i := 0; i < 6; i++ {
go read(url, topic, dialer, i)
}

go write(url, topic, dialer)
<-ctx.Done()
}

正在记录以下内容。
2020/11/02 23:19:22 send:       V0
2020/11/02 23:19:23 send: V1
2020/11/02 23:19:23 send: V2
2020/11/02 23:19:23 send: V3
2020/11/02 23:19:24 send: V4
2020/11/02 23:19:24 send: V5
2020/11/02 23:19:24 send: V6
2020/11/02 23:19:25 send: V7
2020/11/02 23:19:25 send: V8
2020/11/02 23:19:25 send: V9
2020/11/02 23:19:25 send: V10
2020/11/02 23:19:26 send: V11
2020/11/02 23:19:26 send: V12
2020/11/02 23:19:26 send: V13
2020/11/02 23:19:26 send: V14
2020/11/02 23:19:26 send: V15
2020/11/02 23:19:27 send: V16
2020/11/02 23:19:27 send: V17
2020/11/02 23:19:27 send: V18
2020/11/02 23:19:27 send: V19
2020/11/02 23:19:28 send: V20
2020/11/02 23:19:29 send: V21
2020/11/02 23:19:29 send: V22
2020/11/02 23:19:29 send: V23
2020/11/02 23:19:29 send: V24
2020/11/02 23:19:29 send: V25
2020/11/02 23:19:30 send: V26
2020/11/02 23:19:30 send: V27
2020/11/02 23:19:30 send: V28
2020/11/02 23:19:30 send: V29
2020/11/02 23:19:31 send: V30
2020/11/02 23:19:31 send: V31
2020/11/02 23:19:31 send: V32
2020/11/02 23:19:32 send: V33
2020/11/02 23:19:32 send: V34
2020/11/02 23:19:32 rec3: V8
2020/11/02 23:19:32 rec3: V14
2020/11/02 23:19:32 rec3: V15
2020/11/02 23:19:32 rec3: V16
2020/11/02 23:19:32 rec3: V17
2020/11/02 23:19:32 rec3: V20
2020/11/02 23:19:32 rec3: V21
2020/11/02 23:19:32 rec3: V23
2020/11/02 23:19:32 rec3: V29
2020/11/02 23:19:32 rec1: V0
2020/11/02 23:19:32 rec1: V9
2020/11/02 23:19:32 rec1: V22
2020/11/02 23:19:32 rec1: V28
2020/11/02 23:19:32 rec4: V4
2020/11/02 23:19:32 rec4: V5
2020/11/02 23:19:32 rec4: V7
2020/11/02 23:19:32 rec4: V10
2020/11/02 23:19:32 rec4: V11
2020/11/02 23:19:32 rec4: V12
2020/11/02 23:19:32 rec4: V18
2020/11/02 23:19:32 rec4: V24
2020/11/02 23:19:32 rec4: V25
2020/11/02 23:19:32 rec4: V30
2020/11/02 23:19:32 rec4: V31
2020/11/02 23:19:32 send: V35
2020/11/02 23:19:32 rec5: V1
2020/11/02 23:19:32 rec5: V2
2020/11/02 23:19:32 rec5: V3
2020/11/02 23:19:32 rec5: V34
2020/11/02 23:19:32 rec2: V6
2020/11/02 23:19:32 rec2: V13
2020/11/02 23:19:32 rec2: V26
2020/11/02 23:19:32 rec2: V33
2020/11/02 23:19:32 send: V36
2020/11/02 23:19:33 send: V37
2020/11/02 23:19:33 send: V38
2020/11/02 23:19:33 send: V39
2020/11/02 23:19:33 send: V40
2020/11/02 23:19:33 send: V41
2020/11/02 23:19:33 rec0: V19
2020/11/02 23:19:33 rec0: V27
2020/11/02 23:19:33 rec0: V32
2020/11/02 23:19:34 send: V42
2020/11/02 23:19:34 send: V43
2020/11/02 23:19:34 send: V44
2020/11/02 23:19:34 send: V45
2020/11/02 23:19:34 send: V46
2020/11/02 23:19:35 send: V47
2020/11/02 23:19:35 send: V48
2020/11/02 23:19:35 send: V49
2020/11/02 23:19:35 send: V50
2020/11/02 23:19:35 send: V51
2020/11/02 23:19:35 send: V52
2020/11/02 23:19:36 send: V53
2020/11/02 23:19:36 send: V54
2020/11/02 23:19:36 send: V55
2020/11/02 23:19:36 send: V56
2020/11/02 23:19:36 send: V57
2020/11/02 23:19:37 send: V58
2020/11/02 23:19:37 send: V59
2020/11/02 23:19:37 send: V60
2020/11/02 23:19:38 send: V61
2020/11/02 23:19:38 send: V62
2020/11/02 23:19:38 send: V63
2020/11/02 23:19:38 send: V64
2020/11/02 23:19:38 send: V65
2020/11/02 23:19:39 send: V66
2020/11/02 23:19:39 send: V67
2020/11/02 23:19:39 send: V68
2020/11/02 23:19:40 send: V69
2020/11/02 23:19:40 send: V70
2020/11/02 23:19:40 send: V71
2020/11/02 23:19:40 send: V72
2020/11/02 23:19:40 send: V73
2020/11/02 23:19:40 send: V74
2020/11/02 23:19:41 send: V75
2020/11/02 23:19:41 send: V76
2020/11/02 23:19:41 rec1: V41
2020/11/02 23:19:41 rec1: V56
2020/11/02 23:19:41 rec1: V68
2020/11/02 23:19:41 rec1: V74
2020/11/02 23:19:41 rec1: V75
2020/11/02 23:19:41 rec1: V76
2020/11/02 23:19:41 rec3: V37
2020/11/02 23:19:41 rec3: V40
2020/11/02 23:19:41 rec3: V42
2020/11/02 23:19:41 rec3: V48
2020/11/02 23:19:41 rec3: V55
2020/11/02 23:19:41 rec3: V57
2020/11/02 23:19:41 rec3: V60
2020/11/02 23:19:41 rec3: V61
2020/11/02 23:19:41 rec3: V62
2020/11/02 23:19:41 send: V77
2020/11/02 23:19:41 rec4: V38
2020/11/02 23:19:41 rec4: V39
2020/11/02 23:19:41 rec4: V45
2020/11/02 23:19:41 rec4: V46
2020/11/02 23:19:41 rec4: V47
2020/11/02 23:19:41 rec4: V53
2020/11/02 23:19:41 rec4: V59
2020/11/02 23:19:41 rec4: V70
2020/11/02 23:19:41 rec4: V71
2020/11/02 23:19:41 rec4: V73
2020/11/02 23:19:41 rec5: V35
2020/11/02 23:19:41 rec5: V36
2020/11/02 23:19:41 rec5: V43
2020/11/02 23:19:41 rec5: V49
2020/11/02 23:19:41 rec5: V54
2020/11/02 23:19:41 rec5: V63
2020/11/02 23:19:41 rec5: V69
2020/11/02 23:19:41 rec5: V77
2020/11/02 23:19:41 send: V78
2020/11/02 23:19:41 rec2: V44
2020/11/02 23:19:41 rec2: V50
2020/11/02 23:19:41 rec2: V51
2020/11/02 23:19:41 rec2: V64
2020/11/02 23:19:41 rec2: V65
2020/11/02 23:19:41 rec2: V66
2020/11/02 23:19:41 rec2: V72
2020/11/02 23:19:41 send: V79
2020/11/02 23:19:42 send: V80
2020/11/02 23:19:42 send: V81
2020/11/02 23:19:42 send: V82
2020/11/02 23:19:42 send: V83
2020/11/02 23:19:42 send: V84
2020/11/02 23:19:43 send: V85
2020/11/02 23:19:43 rec0: V52
2020/11/02 23:19:43 rec0: V58
2020/11/02 23:19:43 rec0: V67
2020/11/02 23:19:43 send: V86
任何建议将不胜感激。谢谢!
编辑:
缓冲肯定是在Kafka-Go上发生的。 Sarama不会遇到相同的行为:
package main

import (
"context"
"fmt"

"github.com/Shopify/sarama"

"crypto/tls"
"crypto/x509"

"log"
"strings"
"time"
)

var (
broker = "___-_____.us-east1.gcp.confluent.cloud:9092"
brokers = []string{broker}
clientID = "___________"
username = "___________"
password = "___________"
topic = "sarama"
)

func main() {

log.Printf("Kafka brokers: %s", strings.Join(brokers, ", "))
ctx := context.Background()
sync := newSyncProducer()
// accessLog := newAsyncProducer()

createTopic(topic)

go func() {
for i := 0; ; i++ {
v := sarama.StringEncoder(fmt.Sprintf("V%d", i))

p, o, err := sync.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Value: v,
})
if err != nil {
panic(err)
}
fmt.Printf("sent\t\t%v\tp: %d\toffset: %d\t\n", v, p, o)
time.Sleep(100 * time.Millisecond)
}
}()
ps := []sarama.PartitionConsumer{}
offset := int64(0)

loop := func(msgs <-chan *sarama.ConsumerMessage) {
for msg := range msgs {
fmt.Printf("recv:\t\t%s\tp: %d\toffset: %d\n", msg.Value, msg.Partition, msg.Offset)
}
}

for i := 0; i < 6; i++ {
ps = append(ps, createPartitionConsumer(topic, int32(i), offset))
}

for _, p := range ps {
go loop(p.Messages())
}
<-ctx.Done()

}

func createPartitionConsumer(topic string, partition int32, offset int64) sarama.PartitionConsumer {
config := baseConfig()
c, err := sarama.NewConsumer(brokers, config)

if err != nil {
panic(err)
}
p, err := c.ConsumePartition(topic, partition, offset)
if err != nil {
panic(err)
}
return p
}

func createTopic(topic string) {
config := baseConfig()
admin, err := sarama.NewClusterAdmin(brokers, config)
if err != nil {
log.Fatal("Error while creating cluster admin: ", err.Error())
}
defer func() { _ = admin.Close() }()
err = admin.CreateTopic(topic, &sarama.TopicDetail{
NumPartitions: 6,
ReplicationFactor: 3,
}, false)
if err != nil {
log.Println("Error while creating topic: ", err.Error())
}
}

func baseConfig() *sarama.Config {
rootCAs, _ := x509.SystemCertPool()
if rootCAs == nil {
rootCAs = x509.NewCertPool()
}

config := sarama.NewConfig()
config.Version = sarama.MaxVersion
config.Net.TLS.Enable = true
config.Net.TLS.Config = &tls.Config{
RootCAs: rootCAs,
InsecureSkipVerify: false,
}

config.ClientID = clientID
config.Net.SASL.Enable = true
config.Net.SASL.Password = password
config.Net.SASL.User = username
return config
}

func newSyncProducer() sarama.SyncProducer {

config := baseConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 10
config.Producer.Return.Successes = true

producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
log.Fatalln("Failed to start Sarama producer:", err)
}

return producer
}

实际上,在某些情况下,它实际上是在确认发送之前接收的,这使我想知道是否存在内部消息传递,如果是,我是否应该关心...
recv:           V1176   p: 1    offset: 355
recv: V1177 p: 2 offset: 363
send: V1177 p: 2 offset: 363
send: V1178 p: 5 offset: 377
recv: V1178 p: 5 offset: 377
recv: V1179 p: 1 offset: 356
send: V1179 p: 1 offset: 356
send: V1180 p: 1 offset: 357
recv: V1180 p: 1 offset: 357
recv: V1181 p: 1 offset: 358
send: V1181 p: 1 offset: 358
send: V1182 p: 4 offset: 393
recv: V1182 p: 4 offset: 393
send: V1183 p: 4 offset: 394
recv: V1183 p: 4 offset: 394
send: V1184 p: 3 offset: 358
recv: V1184 p: 3 offset: 358
send: V1185 p: 2 offset: 364
recv: V1185 p: 2 offset: 364
send: V1186 p: 3 offset: 359
recv: V1186 p: 3 offset: 359
recv: V1187 p: 3 offset: 360
send: V1187 p: 3 offset: 360
send: V1188 p: 5 offset: 378
recv: V1188 p: 5 offset: 378
send: V1189 p: 2 offset: 365
recv: V1189 p: 2 offset: 365
recv: V1190 p: 4 offset: 395
send: V1190 p: 4 offset: 395
send: V1191 p: 1 offset: 359
recv: V1191 p: 1 offset: 359
send: V1192 p: 4 offset: 396
recv: V1192 p: 4 offset: 396
send: V1193 p: 0 offset: 431
recv: V1193 p: 0 offset: 431
send: V1194 p: 4 offset: 397
recv: V1194 p: 4 offset: 397
recv: V1195 p: 2 offset: 366
send: V1195 p: 2 offset: 366
send: V1196 p: 3 offset: 361
recv: V1196 p: 3 offset: 361

最佳答案

您需要更改ReaderConfig.MinBytes,否则segmentio/kafka-go会将其设置为1e6 = 1 MB,在这种情况下,Kafka将等待积累这么多的数据,然后再回答请求。

func newReader(url string, topic string, partition int, dialer *kafka.Dialer) *kafka.Reader {

return kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{url},
Topic: topic,
Dialer: dialer,
Partition: partition,
MinBytes: 1, // same value of Shopify/sarama
MaxBytes: 57671680,
})
}
另一方面, shopify/sarama的默认值为1个字节。
引用:
  • segmentio/kafka-go
  • Shopify/sarama
  • 关于go - 使用Kafka-Go,为什么我看到似乎是批量读取/写入的内容?我是否缺少配置?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64656638/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com