gpt4 book ai didi

c# - 带有 Reactive Extension 和事务 ODP.NET 的 Oracle AQ 回调

转载 作者:太空宇宙 更新时间:2023-11-03 12:34:19 25 4
gpt4 key购买 nike

我已经实现了从 Oracle AQ 中取出消息并将其作为 IObservable 公开给系统。工作流程如下:-

  1. 应用程序从 Oracle 收到有关新消息的回调事件。
  2. 应用程序将消息从队列中取出并将其添加到 IObservable(消息作为事务的一部分从队列中取出,该事务在消息从队列中取出后立即提交)。

我意识到一个潜在的问题,那就是当消息出列时,事务会立即提交,而不是等待它被应用程序成功使用。下面是我正在使用的代码,但需要建议在应用程序成功使用事务后在哪里/如何提交事务。目前,它在私有(private) Dequeue 方法中启动并提交/回滚事务。

public sealed class Queue<T> : IQueue<T> where T : IQueueDataType
{
private readonly OracleConnection _connection;

private readonly string _consumerName;

private readonly IQueueSetting _queueSetting;

private readonly IDbConnectionFactory _dbConnectionFactory;

private OracleAQQueue _queue;

private IObservable<T> _messages;

private bool _isDisposed;

public Queue(IDbConnectionFactory dbConnectionFactory, IDalSettings dalSettings, IQueueSetting queueSetting)
{
_dbConnectionFactory = dbConnectionFactory;
_connection = dbConnectionFactory.Create() as OracleConnection;
_consumerName = dalSettings.Consumer;
_queueSetting = queueSetting;

}

public void Connect()
{
_connection.Open();
_queue = new OracleAQQueue(_queueSetting.QueueName, _connection)
{
DequeueOptions = { Wait = 10, Visibility = OracleAQVisibilityMode.Immediate , ConsumerName = _consumerName, NavigationMode = OracleAQNavigationMode.FirstMessage, DequeueMode = OracleAQDequeueMode.Remove},
UdtTypeName = _queueSetting.QueueDataTypeName,
MessageType = OracleAQMessageType.Udt
};

_queue.NotificationConsumers = new[] { _consumerName };

_messages = Observable
.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs>(
h => _queue.MessageAvailable += h, h => _queue.MessageAvailable -= h)
.Where(x => x.EventArgs.AvailableMessages > 0)
.Select(x =>
{
try
{
Log.Info("Msg received", "Queue", _queueSetting.QueueName);

OracleAQMessage msg = Dequeue();

Log.Info("Msg received id " + msg.MessageId, "Queue", _queueSetting.QueueName);
return (T)msg.Payload;
}
catch (Exception e)
{

}
}).Publish().RefCount();

}

private OracleAQMessage Dequeue()
{
using (var connection = _dbConnectionFactory.Create() as OracleConnection)
{
try
{
connection.Open();
using (OracleTransaction transaction = connection.BeginTransaction())
{
try
{
OracleAQMessage msg = _queue.Dequeue();
**transaction.Commit();**
return msg;
}
catch (Exception e)
{
**transaction.Rollback();**
throw;
}
}
}
catch (Exception e)
{
Log.Error(string.Format("Error occurred while connecting to database to dequeue new message. Error : {0}", e),
"Dequeue", GetType().FullName);
throw;
}
finally
{
connection.Close();
}
}
}

public IObservable<T> GetMessages()
{
return _messages;
}

public void Dispose()
{
if (!_isDisposed)
{
if (_queue != null)
{
_queue.Dispose();
}

_connection.Dispose();
_isDisposed = true;
}
}
}

如果不使用 IObservable,我只公开一个事件提交和回滚事务将非常容易,但我喜欢我可以用 IObservable 做的事情,即我可以运行Linq 但不知道如何提交事务。

最佳答案

我认为这里没有简单的解决方法。如果我理解正确的话:

  1. Oracle 推送了一个事件,
  2. 您想通过 IObservable 公开事件流,
  3. 应用程序“处理”它,
  4. 如果处理成功则提交,否则回滚。

问题是 IObservable是一种单向机制。一旦你发布了一条消息(在我们的例子中,你从这个 Oracle 队列中得到了一些东西),目的不是跟踪它,而是稍后决定是否提交/回滚。所以你的选择几乎是将你的应用程序逻辑填充到某种形式的处理程序中:

Func<OracleMessage, bool> isMessageCommitable; //...application handling logic here

var appHandledMessages = oracleSourceMessages
.Select(m => Tuple.Create(m, isMessageCommitable(m)))
.Publish()
.RefCount();

appHandledMessages
.Where(t => t.Item2)
.Subscribe(t => Commit(t.Item1));

appHandledMessages
.Where(t => !t.Item2)
.Subscribe(t => Rollback(t.Item1));

...或设置IObservable s 指向另一个方向,它将从应用程序推回队列,哪些消息应该被提交/回滚。您可能需要两个,一个用于提交,一个用于回滚,这些可能应该传递到 Queue<T> 的构造函数中。 .

祝你好运。

关于c# - 带有 Reactive Extension 和事务 ODP.NET 的 Oracle AQ 回调,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41565039/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com