- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个go例程,该例程基本上充当KafkaConsumer
,它从一个主题读取消息,然后为收到的每条消息生成另一个go routine
。现在,当Consumer go routine
应用程序关闭时,应该关闭了这个main go routine
。但是,在正确关闭此功能方面,我面临着困难。
以下是Kafka Consumer
定义
package svc
import (
"event-service/pkg/pb"
"fmt"
"github.com/gogo/protobuf/proto"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
"log"
"os"
"sync"
)
type EventConsumer func(event eventService.Event)
type KafkaConsumer struct {
done chan bool
eventChannels []string
consumer *kafka.Consumer
consumerMapping map[string]EventConsumer
wg *sync.WaitGroup
}
func getKafkaConsumerConfigMap(config map[string]interface{}) *kafka.ConfigMap {
configMap := &kafka.ConfigMap{}
for key, value := range config {
err := configMap.SetKey(key, value)
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while setting %v: %v", err, key, value))
}
}
return configMap
}
func NewKafkaConsumer(channels []string, config map[string]interface{}, consumerMapping map[string]EventConsumer) *KafkaConsumer {
var wg sync.WaitGroup
consumer, err := kafka.NewConsumer(getKafkaConsumerConfigMap(config))
done := make(chan bool, 1)
if err != nil {
log.Fatalf("An error %v occurred while starting kafka consumer.", err)
}
err = consumer.SubscribeTopics(channels, nil)
if err != nil {
log.Fatalf("An error %v occurred while subscribing to kafka topics %v.", err, channels)
}
return &KafkaConsumer{eventChannels: channels, done: done, wg: &wg, consumer: consumer, consumerMapping: consumerMapping}
}
func (kc *KafkaConsumer) getEvent(eventData []byte) *eventService.Event {
event := eventService.Event{}
err := proto.Unmarshal(eventData, &event)
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while un marshalling data from kafka.", err))
}
return &event
}
func (kc *KafkaConsumer) Consume() {
go func() {
run := true
for run == true {
select {
case sig := <-kc.done:
log.Println(fmt.Sprintf("Caught signal %v: terminating \n", sig))
run = false
return
default:
}
e := <-kc.consumer.Events()
switch event := e.(type) {
case kafka.AssignedPartitions:
_, _ = fmt.Fprintf(os.Stderr, "%% %v\n", event)
err := kc.consumer.Assign(event.Partitions)
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while assigning partitions.", err))
}
case kafka.RevokedPartitions:
_, _ = fmt.Fprintf(os.Stderr, "%% %v\n", event)
err := kc.consumer.Unassign()
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while unassigning partitions.", err))
}
case *kafka.Message:
domainEvent := kc.getEvent(event.Value)
kc.wg.Add(1)
go func(event *eventService.Event) {
defer kc.wg.Done()
if eventConsumer := kc.consumerMapping[domainEvent.EntityType]; eventConsumer != nil {
eventConsumer(*domainEvent)
} else {
log.Println(fmt.Sprintf("Event consumer not found for %v event type", domainEvent.EntityType))
}
}(domainEvent)
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
_, _ = fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
}
}
}()
}
func (kc *KafkaConsumer) Close() {
log.Println("Waiting")
kc.wg.Wait()
kc.done <- true
log.Println("Done waiting")
err := kc.consumer.Close()
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while closing kafka consumer.", err))
}
}
package main
import (
"event-service/pkg/pb"
"event-service/pkg/svc"
"fmt"
"log"
)
func main() {
eventConsumerMapping := map[string]svc.EventConsumer{"doctor-created": func(event eventService.Event) {
log.Println(fmt.Sprintf("Got event %v from kafka", event))
}}
consumerConfig := map[string]interface{}{
"bootstrap.servers": "localhost:9092",
"group.id": "catalog",
"go.events.channel.enable": true,
"go.application.rebalance.enable": true,
"enable.partition.eof": true,
"auto.offset.reset": "earliest",
}
kafkaConsumer := svc.NewKafkaConsumer([]string{"doctor-created"}, consumerConfig, eventConsumerMapping)
kafkaConsumer.Consume()
kafkaConsumer.Close()
}
consume
函数,我在这里错过了什么?
最佳答案
好了,解决方法在这里,
1.由于使用者go例程应在主go例程处于 Activity 状态时才存在,并且main go例程也是一个无穷尽的go例程,因此在go例程运行时关闭使用者go例程不是正确的方法。
因此以下解决方案有效
package main
import (
"event-service/pkg/pb"
"event-service/pkg/svc"
"fmt"
"log"
"sync"
)
func main() {
eventConsumerMapping := map[string]svc.EventConsumer{"doctor-created": func(event eventService.Event) {
log.Println(fmt.Sprintf("Got event %v from kafka", event))
}}
consumerConfig := map[string]interface{}{
"bootstrap.servers": "localhost:9092",
"group.id": "catalog-2",
"session.timeout.ms": 6000,
"go.events.channel.enable": true,
"go.application.rebalance.enable": true,
"enable.partition.eof": true,
"auto.offset.reset": "earliest",
}
var wg sync.WaitGroup
kafkaConsumer := svc.NewKafkaConsumer([]string{"doctor-created"}, consumerConfig, eventConsumerMapping)
kafkaConsumer.Consume(&wg)
wg.Wait()
kafkaConsumer.Close()
}
package svc
import (
"event-service/pkg/pb"
"fmt"
"github.com/gogo/protobuf/proto"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
"log"
"os"
"os/signal"
"sync"
"syscall"
)
type EventConsumer func(event eventService.Event)
type KafkaConsumer struct {
done chan bool
consumer *kafka.Consumer
consumerMapping map[string]EventConsumer
sigChan chan os.Signal
channels []string
}
func getKafkaConsumerConfigMap(config map[string]interface{}) *kafka.ConfigMap {
configMap := &kafka.ConfigMap{}
for key, value := range config {
err := configMap.SetKey(key, value)
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while setting %v: %v", err, key, value))
}
}
return configMap
}
func NewKafkaConsumer(channels []string, config map[string]interface{}, consumerMapping map[string]EventConsumer) *KafkaConsumer {
sigChan := make(chan os.Signal, 1)
consumer, err := kafka.NewConsumer(getKafkaConsumerConfigMap(config))
done := make(chan bool, 1)
if err != nil {
log.Fatalf("An error %v occurred while starting kafka consumer.", err)
}
err = consumer.SubscribeTopics(channels, nil)
if err != nil {
log.Fatalf("An error %v occurred while subscribing to kafka topics %v.", err, channels)
}
return &KafkaConsumer{channels: channels, sigChan: sigChan, done: done, consumer: consumer, consumerMapping: consumerMapping}
}
func (kc *KafkaConsumer) getEvent(eventData []byte) *eventService.Event {
event := eventService.Event{}
err := proto.Unmarshal(eventData, &event)
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while un marshalling data from kafka.", err))
}
return &event
}
func (kc *KafkaConsumer) Consume(wg *sync.WaitGroup) {
signal.Notify(kc.sigChan, syscall.SIGINT, syscall.SIGTERM)
wg.Add(1)
go func() {
run := true
defer wg.Done()
for run == true {
select {
case sig := <-kc.sigChan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
case ev := <-kc.consumer.Events():
switch e := ev.(type) {
case kafka.AssignedPartitions:
_, _ = fmt.Fprintf(os.Stderr, "%% %v\n", e)
_ = kc.consumer.Assign(e.Partitions)
case kafka.RevokedPartitions:
_, _ = fmt.Fprintf(os.Stderr, "%% %v\n", e)
_ = kc.consumer.Unassign()
case *kafka.Message:
domainEvent := kc.getEvent(e.Value)
wg.Add(1)
go func(event *eventService.Event) {
defer wg.Done()
if eventConsumer := kc.consumerMapping[domainEvent.EntityType]; eventConsumer != nil {
eventConsumer(*domainEvent)
} else {
log.Println(fmt.Sprintf("Event consumer not found for %v event type", domainEvent.EntityType))
}
}(domainEvent)
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
// Errors should generally be considered as informational, the client will try to automatically recover
_, _ = fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
}
}
}
}()
}
func (kc *KafkaConsumer) Close() {
err := kc.consumer.Close()
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while closing kafka consumer.", err))
}
}
关于go - 正确关闭执行无限循环的Go例程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61210857/
我有 3 个列表项,每 3 秒向上旋转一次。我正在使用 transformY 属性来做这件事。问题是,当它到达最后一个元素时,它会循环返回,从而产生重新开始的效果。 如何通过在最后一项之后继续向上旋转
我如何制作一个处理旋转的无限/重复世界,就像在这个游戏中一样: http://bloodfromastone.co.uk/retaliation.html 我通过具有这样的层次结构对我的旋转移动世界进
这个问题已经有答案了: Using explicitly numbered repetition instead of question mark, star and plus (4 个回答) 已关闭
程序说明: I have this program of mine which is intended to read every word from a file (large one) and t
while 循环应该比较这两个对象的 ibsn。正在比较的对象: list[0] = new ReadingMatter ("Words and Stuff", "9-082-1090-1");
已关闭。这个问题是 not reproducible or was caused by typos 。目前不接受答案。 这个问题是由拼写错误或无法再重现的问题引起的。虽然类似的问题可能是 on-top
我完全被屏蔽了。我尝试修改 C 中的“警报”信号,以便在秒数到期时读取一个简单的变量。我的代码如下: 在主要部分: int semnal; signal(SIGALRM, alarmHandle
我正在接受多行信息(字符串,直到我稍后解析它们)。例如: 1 5 0 2 9 6 2 9 1 我编写这段代码来分隔行,因为我将不得不以某种方式操作每一行。 Scanner scan = new Sca
我不熟悉 jQuery,并且我有多余的 jQuery 调用,我想将它们放入循环中。 $('.class1').on('click', function () { ... $('.class2').on
我有一个树结构,其中每个节点都有 5 个子节点,并且不允许超过 5 个。我希望以广度优先搜索的方式遍历这棵树。 现在我想使用广度优先搜索方式从选定的父节点计算空节点。 例如 如果给定的父节点为 1,则
目标/动机 我想写一个服务,它应该一直运行。但是当服务已经运行时,应该不可能再次启动该服务。 用例 用户 X 打开页面 myService.php 并通过单击页面上的按钮启动服务。之后关闭浏览器。一段
我正在尝试编译 shogun 工具箱,但遇到了这个错误 C:/shogun-3.0.0/shogun-3.0.0/src/shogun/../shogun/mathematics/Math.h
需要学校的 JavaScript 作业帮助,但不知道该怎么做,希望得到一些提示? 我们应该创建一个 6 面掷骰子程序,用户可以选择应该掷多少个骰子,最少 1 个和最多 5 个骰子。 所用骰子数量的总和
我在无限 ScrollView 中有 5 张图片。 因此,为了使 scrollView 无限/循环,我将图像定位如下: 5 1 2 3 4 5 1含义:最后一张图片第一张图片第二张图片.....最后一
我正在使用 ExTwitter库,并希望能够偶尔终止对流式 API 的调用以更改参数。 我当前的代码看起来像这样: for tweet #finished end 关于elixir - 如何中断(无
我想每 3 秒更改一次 div 的背景。这需要循环,因此一旦最后一个背景图像显示,它就会循环回到第一个背景图像,依此类推。我在这样做时遇到了麻烦。 我之前发过一篇文章,内容非常模糊,没有得到帮助。
我在做this教程,无法让我的页面正确加载。我不断在控制台中收到错误:[$rootScope:infdig]。 我对 Angular 很陌生,但从我读到的内容来看,我在某个地方有一个无限循环。我预计它
所以我试图创建一个无限的 asyncIterator/生成器。该代码应该为“for wait of”循环生成“Hello”和“Hi”,然后永远等待下一个值。问题是它不等待第三个值,也不在循环后打印 2
下图显示了我如何在 HTML5/JS 中制作无限背景滚动。我的连续背景由 X block Canvas 组成。我将在到达下一个 Canvas 之前立即渲染它,并释放上一个 Canvas。这里的问题是动
作为一个业余项目,我正在研究一些自制的素数生成问题,尝试编写一些不同的实现作为自学 C 和 C++ 的方法。当然,生成低素数的最快方法是已经拥有它们,所以我想着手建立一个硬盘素数列表数据文件。我想编写
我是一名优秀的程序员,十分优秀!