gpt4 book ai didi

redis - 我可以在 Redis 中使用一个订阅者模式订阅多个主机(和集群)吗?

转载 作者:IT王子 更新时间:2023-10-29 06:16:27 26 4
gpt4 key购买 nike

我正在实现一项功能,将 redis 发布的消息集成到我创建的项目的 mongodb 中,并在测试环境中完美运行。

但我担心生产环境,我有 3 个主服务器,它们存在 12 个从集群。如果我将来自他们的消息发布到 channel 模式,我可以在一个地方订阅所有消息吗

最佳答案

是的,通过 stackexchange redis 设置是可能的,我已经完成了我的一般结构,如下所示

 public class RedisSubscriber: IRedisSubscriber
{
private readonly RedisConfigurationManager _config;
private readonly IMongoDbRepository _mongoDbRepository;
private readonly ILogger<RedisSubscriber> _logger;
private readonly IConnectionMultiplexer _connectionMultiplexer;


public RedisSubscriber(IServiceProvider serviceLocator, ILogger<RedisSubscriber> logger, IConnectionMultiplexer conn)
{
_config = (RedisConfigurationManager)serviceLocator.GetService(typeof(RedisConfigurationManager));
_mongoDbRepository = (IMongoDbRepository)serviceLocator.GetService(typeof(IMongoDbRepository));
_connectionMultiplexer = conn;
_logger = logger;
}

public void SubScribeChannel()
{
_logger.LogInformation("!SubScribeChannel started!!");

string channelName = _config.ActiveChannelName;
var pubSub = _connectionMultiplexer.GetSubscriber();
try
{
pubSub.Subscribe(channelName, async (channel, message) => await MessageActionAsync(message, channel));
}
catch(Exception ex)
{
_logger.LogInformation(String.Format("!error: {0}", ex.Message));
}
Debug.WriteLine("EOF");
}

private async Task MessageActionAsync(RedisValue message, string channel)
{
try
{
Transformer t = new Transformer(_logger);
_logger.LogInformation(String.Format("!SubScribeChannel message received on message!! channel: {0}, message: {1}", channel, message));
string transformedMessage = Transformer.TransformJsonStringData2Message(message);
List<Document> documents = Transformer.Deserialize<List<Document>>(transformedMessage);

await MergeToMongoDb(documents, channel);
_logger.LogInformation("!Merged");
}
catch (Exception ex)
{
_logger.LogInformation(String.Format("!error: {0}", ex.Message));
}
}

private async Task MergeToMongoDb(IList<Document> documents, string channelName)
{
try
{
foreach (Document doc in documents)
{
TurSysPartitionedDocument td = JsonConvert.DeserializeObject<TurSysPartitionedDocument>(JsonConvert.SerializeObject(doc));
td.DepartureDate = td.DepartureDate.ToLocalTime();
td.PartitionKey = channelName;
TurSysPartitionedDocument isExist = await _mongoDbRepository.GetOneAsync<TurSysPartitionedDocument>(k =>
k.ProductCode == td.ProductCode &&
k.ProviderCode == td.ProviderCode &&
k.CabinClassName == td.CabinClassName &&
k.OriginAirport == td.OriginAirport &&
k.DestinationAirport == td.DestinationAirport &&
k.Adult >= td.Adult &&
k.DepartureDate == td.DepartureDate,
td.PartitionKey);

if (isExist != null)
{
//_logger.LogInformation(String.Format("!isExist departure date: {0}", isExist.DepartureDate));
isExist.SearchCount++;
await _mongoDbRepository.UpdateOneAsync(isExist, k => k.Adult, td.Adult);
await _mongoDbRepository.UpdateOneAsync(isExist, k => k.SearchCount, isExist.SearchCount);
}
else
{
//_logger.LogInformation(String.Format("!last ToLocalTime td departure date: {0}", td.DepartureDate));
td.SearchCount = 1;
await _mongoDbRepository.AddOneAsync(td);
//_logger.LogInformation(String.Format("!last ToLocalTime result td departure date: {0}", td.DepartureDate));
}
}
}
catch(Exception ex)
{
_logger.LogInformation(String.Format("!error: {0}", ex.Message));
}
}


}

关于redis - 我可以在 Redis 中使用一个订阅者模式订阅多个主机(和集群)吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55283431/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com