gpt4 book ai didi

与卡夫卡消费者一起去 channel

转载 作者:数据小太阳 更新时间:2023-10-29 03:22:55 26 4
gpt4 key购买 nike

我刚开始学习 channel 。我正在使用汇合的 kafka 消费者来创建功能性消费者。我想要完成的是将消息发送到缓冲 channel (2,000)...然后使用管道将 channel 中的消息写入 redis。我已经通过执行 println 来让消费者部分工作了一条一条地发送消息,直到它到达偏移量的末尾,但是当我尝试添加一个 channel 时,它似乎命中了 default: switch 中的案例然后就卡住了。

我似乎也没有正确填写 channel ?这fmt.Println("count is: ", len(redisChnl))总是打印 0

这是我目前所拥有的:

// Example function-based high-level Apache Kafka consumer
package main

import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"os"
"os/signal"
"syscall"
"time"
"encoding/json"
"regexp"
"github.com/go-redis/redis"
"encoding/binary"
)

var client *redis.Client

func init() {
client = redis.NewClient(&redis.Options{
Addr: ":6379",
DialTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
PoolSize: 10,
PoolTimeout: 30 * time.Second,
})
client.FlushDB()
}

type MessageFormat struct {
MetricValueNumber float64 `json:"metric_value_number"`
Path string `json:"path"`
Cluster string `json:"cluster"`
Timestamp time.Time `json:"@timestamp"`
Version string `json:"@version"`
Host string `json:"host"`
MetricPath string `json:"metric_path"`
Type string `json:"string"`
Region string `json:"region"`
}

//func redis_pipeline(ky string, vl string) {
// pipe := client.Pipeline()
//
// exec := pipe.Set(ky, vl, time.Hour)
//
// incr := pipe.Incr("pipeline_counter")
// pipe.Expire("pipeline_counter", time.Hour)
//
// // Execute
// //
// // INCR pipeline_counter
// // EXPIRE pipeline_counts 3600
// //
// // using one client-server roundtrip.
// _, err := pipe.Exec()
// fmt.Println(incr.Val(), err)
// // Output: 1 <nil>
//}

func main() {


sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "kafka.com:9093",
"group.id": "testehb",
"security.protocol": "ssl",
"ssl.key.location": "/Users/key.key",
"ssl.certificate.location": "/Users/cert.cert",
"ssl.ca.location": "/Users/ca.pem",
})

if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
os.Exit(1)
}

fmt.Printf("Created Consumer %v\n", c)

err = c.SubscribeTopics([]string{"jmx"}, nil)

redisMap := make(map[string]string)

redisChnl := make(chan []byte, 2000)

run := true

for run == true {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ev := c.Poll(100)
if ev == nil {
continue
}

switch e := ev.(type) {
case *kafka.Message:

//fmt.Printf("%% Message on %s:\n%s\n",
// e.TopicPartition, string(e.Value))
if e.Headers != nil {
fmt.Printf("%% Headers: %v\n", e.Headers)
}

str := e.Value
res := MessageFormat{}
json.Unmarshal([]byte(str), &res)


fmt.Println("size", binary.Size([]byte(str)))

host:= regexp.MustCompile(`^([^.]+)`).FindString(res.MetricPath)

redisMap[host] = string(str)
fmt.Println("count is: ", len(redisChnl)) //this always prints "count is: 0"

redisChnl <- e.Value //I think this is the write way to put the messages in the channel?

case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
run = false
default:
fmt.Printf("Ignored %v\n", e)
}

<- redisChnl // I thought I could just empty the channel like this once the buffer is full?


}
}

fmt.Printf("Closing consumer\n")
c.Close()
}

--------编辑--------

好的,我想我是通过移动 <- redisChnl 让它工作的里面default , 但现在我看到 count before readcount after readdefault里面总是打印 2,000 ...我本以为第一个count before read = 2,000然后 count after read = 0因为那时 channel 将是空的??

    select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ev := c.Poll(100)
if ev == nil {
continue
}

switch e := ev.(type) {
case *kafka.Message:

//fmt.Printf("%% Message on %s:\n%s\n",
// e.TopicPartition, string(e.Value))
if e.Headers != nil {
fmt.Printf("%% Headers: %v\n", e.Headers)
}

str := e.Value
res := MessageFormat{}
json.Unmarshal([]byte(str), &res)


//fmt.Println("size", binary.Size([]byte(str)))

host:= regexp.MustCompile(`^([^.]+)`).FindString(res.MetricPath)

redisMap[host] = string(str)

go func() {
redisChnl <- e.Value
}()


case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
run = false
default:
fmt.Println("count before read: ", len(redisChnl))

fmt.Printf("Ignored %v\n", e)

<-redisChnl

fmt.Println("count after read: ", len(redisChnl)) //would've expected this to be 0

}


}

最佳答案

我认为简化此代码的更大方法是将管道分成多个 goroutine。

channel 的优点是多人可以同时在上面书写和阅读。在这个例子中,这可能意味着有一个 go 例程在 channel 上排队,另一个从 channel 中取出并将东西放入 redis。

像这样:

c := make(chan Message, bufferLen)
go pollKafka(c)
go pushToRedis(c)

如果你想添加批处理,你可以添加一个从 kafka channel 轮询的中间阶段,并附加到一个 slice 直到 slice 已满,然后将该 slice 排入 redis 的 channel 。

如果这样的并发性不是目标,那么用 slice 替换代码中的 channel 可能会更容易。如果只有 1 个 goroutine 作用于一个对象,那么尝试使用 channel 并不是一个好主意。

关于与卡夫卡消费者一起去 channel ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50301067/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com