gpt4 book ai didi

c# - wcf msmq 监听器服务删除消息但不处理

转载 作者:太空宇宙 更新时间:2023-11-03 16:06:11 24 4
gpt4 key购买 nike

我有一个在 MSMQ 中创建私有(private)消息的 WCF 服务。消息创建良好,我可以看到我的客户端应用程序正在创建的消息。

我还有一个似乎运行良好的 MSMQ 监听器服务。当我启动该服务时,它似乎成功地“处理”了队列中的消息并将它们删除。但是,我的监听器服务的实现似乎没有得到执行。

我是 MSMQ 的新手,我不知道为什么消息会从队列中删除,也不知道为什么我的监听器方法中的代码没有被执行。

下面是我的服务类...

[ServiceContract(Namespace = ServiceConstants.NAMESPACE, Name = "IOrderService")]
public interface IOrderQueueProcessingService
{

[OperationContract(IsOneWay = true, Action = "*")]
void ProcessOrderQueue(MsmqMessage<string> message);

}

public abstract class OrderQueueProcessingServiceBase : ServiceBase, IOrderQueueProcessingService
{
#region CONSTRUCTORS

protected OrderQueueProcessingServiceBase() { }

protected OrderQueueProcessingServiceBase(List<EventRecord> existingList) : base(existingList) { }

#endregion //CONSTRUCTORS

#region IOrderQueueProcessingService Members


[OperationBehavior(TransactionScopeRequired = false, TransactionAutoComplete = true)]
public virtual void ProcessOrderQueue(MsmqMessage<string> message)
{
throw new NotImplementedException();
}

#endregion
}

public class OrderQueueProcessingService : OrderQueueProcessingServiceBase
{
#region Constructors


public OrderQueueProcessingService() {}

public OrderQueueProcessingService(List<EventRecord> existingList) : base(existingList) { }


#endregion

/// <summary>
/// Processes any Orders in the Orders Queue
/// </summary>
/// <param name="message"></param>
public override void ProcessOrderQueue(MsmqMessage<string> message)
{

var q = new MessageQueue(@".\private$\msmqdemo/submitorderservice.svc");
q.Send("hey");
/*
using (new Tracer("OrderQueueProcessingService"))
{
// add data context to work with.
using (var unitOfWork = new TLFDataContext())
{

var newOrderLines = new List<OrderLineDataContract>
{
new OrderLineDataContract
{
C = "test",
IC = "msw",
Qty = 1,
T = "volume" ,
ED = DateTime.UtcNow.AddDays(5)
}
};

var newOrder = new OrderDataContract
{
LIs = newOrderLines.AsEnumerable(),
PId = 9323,
POId = 8686,
S = "new"

};
var orderService = new OrderService();
var createdOrder = orderService.CreateOrder(null, null, newOrder);
//unitOfWork.SubmitUnitOfWork();

//return null;
}
}*/


}

}

我注释掉了我最终尝试执行的代码,并将其替换为简单的 MSMQ 消息发送,以进行测试。这似乎应该可以正常工作。任何帮助将不胜感激。

下面配置设置...

<service name="ServiceImplementation.OrderQueueProcessingService">
<host>
<baseAddresses>
<add baseAddress="https://localhost:9000/OrderQueueProcessingService.svc" />
</baseAddresses>
</host>
<endpoint address="net.msmq://localhost/private/testingqueue/OrderQueueProcessingService.svc" binding="netMsmqBinding" bindingConfiguration="MsmqBindingNonTransactionalNoSecurity" contract="IOrderQueueProcessingService" />
</service>

最佳答案

我能够弄清楚我的问题。我没有对 QueueOnPeekComplted 事件进行任何事件处理。我将其添加到 Initialize 方法中,并且能够成功处理我的消息。我还添加了对无法处理的消息的处理。下面是我的 OrderQueueProcessingService 的新实现。

public class OrderQueueProcessingService : OrderQueueProcessingServiceBase, IDisposable
{
#region Constructors

public OrderQueueProcessingService()
{
Initialize(ConfigurationManager.AppSettings["OrderQueueProcessingQueueName"]);
}

public OrderQueueProcessingService(List<EventRecord> existingList) : base(existingList) {}

#endregion

#region Properties

private MessageQueue Queue { get; set; }

#endregion

#region IDisposable Members

public new void Dispose()
{
if (Queue == null) return;

//unsubscribe and dispose
Queue.PeekCompleted -= QueueOnPeekCompleted;
Queue.Dispose();
}

#endregion

private void Initialize(string queueName)
{
Queue = new MessageQueue(queueName, false, true, QueueAccessMode.Receive)
{
Formatter = new XmlMessageFormatter(new[] {typeof (OrderQueueDataContract)})
};

//setup events and start.
Queue.PeekCompleted += QueueOnPeekCompleted;
Queue.BeginPeek();
}

private static void MoveMessageToDeadLetter(IDisposable message)
{
var q = new MessageQueue(ConfigurationManager.AppSettings["OrderProcessingDLQ"], QueueAccessMode.Send)
{
Formatter = new XmlMessageFormatter(new[] {typeof (OrderQueueDataContract)})
};
q.Send(message, MessageQueueTransactionType.Single);
q.Dispose();
}

/// <summary>
/// Processes the specified Order message
/// </summary>
/// <param name="orderMessage"></param>
public override void ProcessOrderQueue(OrderQueueDataContract orderMessage)
{
using (var unitOfWork = new MyDataContext())
{
switch (orderMessage.M.ToLower())
{
case "create":
DataAccessLayer.CreateOrder(unitOfWork, orderMessage.O.TranslateToBe());
break;
default:
break;
}
}
}

private void QueueOnPeekCompleted(object sender, PeekCompletedEventArgs peekCompletedEventArgs)
{
var asyncQueue = (MessageQueue) sender;

using (var transaction = new MessageQueueTransaction())
{
transaction.Begin();
try
{
using (var message = asyncQueue.ReceiveById(peekCompletedEventArgs.Message.Id, TimeSpan.FromSeconds(30), transaction))
{
if (message != null) ProcessOrderQueue((OrderQueueDataContract) message.Body);
}
}
catch (InvalidOperationException ex)
{
transaction.Abort();
}
catch (Exception ex)
{
transaction.Abort();
}

if (transaction.Status != MessageQueueTransactionStatus.Aborted) transaction.Commit();
else
{
using (var message = asyncQueue.ReceiveById(peekCompletedEventArgs.Message.Id, TimeSpan.FromSeconds(30), transaction))
{
if (message != null)
{
MoveMessageToDeadLetter(message);
message.Dispose();
}
}

EventLog.WriteEntry("OrderQueueProcessingService", "Could not process message: " + peekCompletedEventArgs.Message.Id);
}
transaction.Dispose();
}

asyncQueue.EndPeek(peekCompletedEventArgs.AsyncResult);
asyncQueue.BeginPeek();
}
}

有人发现此实现有任何问题吗?我不得不摆弄它,但它已经通过了单元测试和过程测试。

我注意到的一件事是,当我的服务第一次启动时,它会处理队列中的所有消息;这可能会阻碍我其他服务的启动。当我在测试时在我的控制台应用程序中启动它们时,这可能只是一个问题。为了安全起见,我最后启动了这项服务。

关于c# - wcf msmq 监听器服务删除消息但不处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19257119/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com