gpt4 book ai didi

go - 使用 BLPOP 处理 Redis 队列会导致单元测试中出现竞争条件?

转载 作者:行者123 更新时间:2023-12-01 21:10:53 24 4
gpt4 key购买 nike

我正在尝试实现一个先进先出的任​​务队列,如 Chapter 6.4.1 of the Redis e-book 中所述。在围棋。出于测试目的,我传入了 CommandExecutor 'worker'函数的接口(interface),如下所示:

package service

import (
"context"

"github.com/gomodule/redigo/redis"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

const commandsQueue = "queuedCommands:"

var pool = redis.Pool{
MaxIdle: 50,
MaxActive: 1000,
Dial: func() (redis.Conn, error) {
conn, err := redis.Dial("tcp", ":6379")
if err != nil {
logrus.WithError(err).Fatal("initialize Redis pool")
}
return conn, err
},
}

// CommandExecutor executes a command
type CommandExecutor interface {
Execute(string) error
}

func processQueue(ctx context.Context, done chan<- struct{}, executor CommandExecutor) error {
rc := pool.Get()
defer rc.Close()

for {
select {
case <-ctx.Done():
done <- struct{}{}
return nil
default:
// If the commands queue does not exist, BLPOP blocks until another client
// performs an LPUSH or RPUSH against it. The timeout argument of zero is
// used to block indefinitely.
reply, err := redis.Strings(rc.Do("BLPOP", commandsQueue, 0))
if err != nil {
logrus.WithError(err).Errorf("BLPOP %s %d", commandsQueue, 0)
return errors.Wrapf(err, "BLPOP %s %d", commandsQueue, 0)
}

if len(reply) < 2 {
logrus.Errorf("Expected a reply of length 2, got one of length %d", len(reply))
return errors.Errorf("Expected a reply of length 2, got one of length %d", len(reply))
}

// BLPOP returns a two-element multi-bulk with the first element being the
// name of the key where an element was popped and the second element
// being the value of the popped element (cf. https://redis.io/commands/blpop#return-value)
if err := executor.Execute(reply[1]); err != nil {
return errors.Wrapf(err, "execute scheduled command: %s", reply[0])
}
done <- struct{}{}
}
}
}

我制作了一个小型示例存储库, https://github.com/kurtpeek/process-queue ,使用此代码以及单元测试的尝试。对于单元测试,我有两个相同的测试(名称不同):

package service

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestProcessQueue(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

executor := &CommandExecutorMock{
ExecuteFunc: func(string) error {
return nil
},
}

done := make(chan struct{})
go processQueue(ctx, done, executor)

rc := pool.Get()
defer rc.Close()

_, err := rc.Do("RPUSH", commandsQueue, "foobar")
require.NoError(t, err)

<-done

assert.Exactly(t, 1, len(executor.ExecuteCalls()))
assert.Exactly(t, "foobar", executor.ExecuteCalls()[0].In1)
}

func TestProcessQueue2(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

executor := &CommandExecutorMock{
ExecuteFunc: func(string) error {
return nil
},
}

done := make(chan struct{})
go processQueue(ctx, done, executor)

rc := pool.Get()
defer rc.Close()

_, err := rc.Do("RPUSH", commandsQueue, "foobar")
require.NoError(t, err)

<-done

assert.Exactly(t, 1, len(executor.ExecuteCalls()))
assert.Exactly(t, "foobar", executor.ExecuteCalls()[0].In1)
}
CommandExecutorMock使用 moq 生成.如果我单独运行每个测试,它们会通过:
~/g/s/g/k/process-queue> go test ./... -v -run TestProcessQueue2
=== RUN TestProcessQueue2
--- PASS: TestProcessQueue2 (0.00s)
PASS
ok github.com/kurtpeek/process-queue/service 0.243s

但是,如果我运行所有测试,第二个会超时:
~/g/s/g/k/process-queue> 
go test ./... -v -timeout 10s
=== RUN TestProcessQueue
--- PASS: TestProcessQueue (0.00s)
=== RUN TestProcessQueue2
panic: test timed out after 10s

看来第二个测试运行的时候,第一个测试启动的goroutine还在运行, BLPOP从队列中读取命令,以便 <-done无限期地在第二个测试 block 中行。尽管调用了 cancel()在第一个测试的父上下文上。

我如何“隔离”这些测试,以便它们在一起运行时都通过? (我尝试将 -p 1 标志传递给 go test 但无济于事)。

最佳答案

This is despite calling cancel() on the parent context of the first test.



写信给 done 之间有一段时间。并调用 cancel() ,这意味着第一个测试可能(并且确实)进入第二个 for/select迭代而不是在 <-ctx.Done() 上退出.更具体地说,测试代码在取消之前包含 2 个断言:
    assert.Exactly(t, 1, len(executor.ExecuteCalls()))
assert.Exactly(t, "foobar", executor.ExecuteCalls()[0].In1)

只有这样 defer cancel()开始,这似乎为时已晚,无法取消第一次执行例程的上下文。

如果你搬家 cancel()在阅读 done 之前打电话,测试通过:

func TestProcessQueue(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

executor := &CommandExecutorMock{
ExecuteFunc: func(string) error {
return nil
},
}

done := make(chan struct{})
go processQueue(ctx, done, executor)

rc := pool.Get()
defer rc.Close()

_, err := rc.Do("RPUSH", commandsQueue, "foobar")
require.NoError(t, err)

cancel() // note this change right here
<-done

assert.Exactly(t, 1, len(executor.ExecuteCalls()))
assert.Exactly(t, "foobar", executor.ExecuteCalls()[0].In1)
}

关于go - 使用 BLPOP 处理 Redis 队列会导致单元测试中出现竞争条件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60944772/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com