gpt4 book ai didi

Redis - 简单队列读取器/写入器的正确方法 - StackExchange.Redis

转载 作者:可可西里 更新时间:2023-11-01 11:25:10 25 4
gpt4 key购买 nike

我希望使用 StackExchange.Redis 实现一个简单的分布式工作队列系统。

我理解没有 BLPOP 等的原因,但就目前而言,我正在使用的界面是基于重复的 TryRead 调用和超时。

我对下面的内容犹豫不决,因为我在处理程序中取消订阅,并设置了一个标志来取消超时。有没有可能遗漏什么?是否有不同的方法来实现这一目标?

    public string TryRead(string queueName, TimeSpan timeout)
{
string result = null;

var chanName = $"qnot_{queueName}";
var done = new ManualResetEvent(false);

void Handler(RedisChannel chan, RedisValue val)
{
_sub.Unsubscribe(chanName, Handler);
result = _database.ListRightPop($"qdata_{queueName}");
done.Set();
}

_sub.Subscribe(chanName, Handler);
done.WaitOne(timeout);

return result;
}

public void Write(string queueName, string text)
{
_database.ListLeftPush($"qdata_{queueName}", text);
_sub.Publish($"qnot_{queueName}", "");
}

如果队列中已有项(并且没有添加任何新项),上述版本将始终超时并返回 null。下面的版本现在首先检查现有数据,这是有效的。但它有一个错误,一个竞争条件:如果第一个读取检查返回否定,然后推送一些东西并发送通知,然后我们订阅并等待超时。

    public string TryRead(string queueName, TimeSpan timeout)
{
var dataName = $"qdata_{queueName}";

var result = (string)_database.ListRightPop(dataName);
if (result != null)
{
return result;
}

var chanName = $"qnot_{queueName}";
var done = new ManualResetEvent(false);

void Handler(RedisChannel chan, RedisValue val)
{
_sub.Unsubscribe(chanName, Handler);
result = _database.ListRightPop(dataName);
done.Set();
}

_sub.Subscribe(chanName, Handler);
done.WaitOne(timeout);

return result;
}

我可以在一个循环中执行 RPOP,但这看起来绝对糟糕。还有其他人做过类似的事情吗?

最佳答案

我最终得到了这个,它有效,但我仍然欢迎其他可行方法的答案:

    public string TryRead(string queueName, TimeSpan timeout)
{
var timer = Stopwatch.StartNew();
var dataName = $"{_keyPrefix}qdata_{queueName}";
var chanName = $"{_keyPrefix}qnot_{queueName}";
var done = new AutoResetEvent(false);
string result;

// subscribe - sets the 'done' flag when a new item is pushed
void Handler(RedisChannel chan, RedisValue val)
{
done.Set();
}

_sub.Subscribe(chanName, Handler);

do
{
// try to read right away (before waiting), in case there was data already there
result = _database.ListRightPop(dataName);
if (result != null)
{
continue;
}

// there wasn't an item right away, so wait for the timeout to expire
// or the subscription to be fired. if it fired, try the read again
var remainingTime = timeout - timer.Elapsed;
if (remainingTime.TotalMilliseconds <= 1.0)
{
break;
}
if (done.WaitOne(remainingTime))
{
result = _database.ListRightPop(dataName);
}
} while (result == null && timer.Elapsed < timeout);

_sub.Unsubscribe(chanName, Handler);

return result;
}

编辑:更新了 w/AutoResetEvent 并从处理程序中删除了 Unsubscribe。请注意发现这一点的人,这对我来说似乎可以作为单个阻塞读取的直接替代,但它不会成为推荐的方法。我之所以使用它,是因为我希望与其他队列实现保持一致,并且正在处理这个特定的 TryRead 签名。

关于Redis - 简单队列读取器/写入器的正确方法 - StackExchange.Redis,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46431146/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com