gpt4 book ai didi

c# - 通过工作线程的异步接口(interface)使用 WCF 服务,如何确保从客户端发送事件 "in order"

转载 作者:太空宇宙 更新时间:2023-11-03 22:18:28 29 4
gpt4 key购买 nike

我正在编写一个 Silverlight 类库来抽象 WCF 服务的接口(interface)。 WCF 服务提供集中式日志记录服务。 Silverlight 类库为日志记录提供了一个简化的类似 log4net 的接口(interface)(logger.Info、logger.Warn 等)。我计划从类库中提供选项,以便记录的消息可以在客户端上累积并以“突发”的形式发送到 WCF 日志记录服务,而不是在每条消息发生时发送。一般来说,这运行良好。类库确实会累积消息,并将消息集合发送到 WCF 日志记录服务,在那里它们由底层日志记录框架记录。

我当前的问题是消息(来自具有单个线程的单个客户端 - 所有日志记录代码都在按钮单击事件中)在日志记录服务中交错。我意识到至少部分原因可能是由于 WCF 日志记录服务的实例化 (PerCall) 或同步。然而,我的消息似乎也是如此快速地连续发生,以至于在异步调用上留下的消息“爆发”实际上是以与生成它们不同的顺序“离开”客户端的。

我尝试按照 here 的描述设置生产者消费者队列稍微(或者应该用空引号“轻微”更改)Work 方法阻塞(WaitOne)直到异步调用返回(即直到异步回调执行)。这个想法是,当一个消息突发发送到 WCF 日志服务时,队列应该等到该突发处理完毕后再发送下一个突发。

也许我试图做的事情不可行,或者我试图解决错误的问题,(或者我只是不知道我在做什么!)。

无论如何,这是我的生产者/消费者队列代码:

  internal class ProducerConsumerQueue : IDisposable
{
EventWaitHandle wh = new AutoResetEvent(false);
Thread worker;
readonly object locker = new object();
Queue<ObservableCollection<LoggingService.LogEvent>> logEventQueue = new Queue<ObservableCollection<LoggingService.LogEvent>>();

LoggingService.ILoggingService loggingService;

internal ProducerConsumerQueue(LoggingService.ILoggingService loggingService)
{
this.loggingService = loggingService;
worker = new Thread(Work);
worker.Start();
}

internal void EnqueueLogEvents(ObservableCollection<LoggingService.LogEvent> logEvents)
{
//Queue the next burst of messages
lock(locker)
{
logEventQueue.Enqueue(logEvents);
//Is this Set conflicting with the WaitOne on the async call in Work?
wh.Set();
}
}

private void Work()
{
while(true)
{
ObservableCollection<LoggingService.LogEvent> events = null;

lock(locker)
{
if (logEventQueue.Count > 0)
{
events = logEventQueue.Dequeue();
if (events == null || events.Count == 0) return;
}
}

if (events != null && events.Count > 0)
{
System.Diagnostics.Debug.WriteLine("1. Work - Sending {0} events", events.Count);

//
// This seems to be the key...
// Send one burst of messages via an async call and wait until the async call completes.
//
loggingService.BeginLogEvents(events, ar =>
{
try
{
loggingService.EndLogEvents(ar);
System.Diagnostics.Debug.WriteLine("3. Work - Back");
wh.Set();
}
catch (Exception ex)
{
}
}, null);

System.Diagnostics.Debug.WriteLine("2. Work - Waiting");

wh.WaitOne();

System.Diagnostics.Debug.WriteLine("4. Work - Finished");
}
else
{
wh.WaitOne();
}
}
}

#region IDisposable Members

public void Dispose()
{
EnqueueLogEvents(null);
worker.Join();
wh.Close();
}

#endregion
}

在我的测试中,它基本上是这样调用的:
//Inside of LogManager, get the LoggingService and set up the queue.
ILoggingService loggingService = GetTheLoggingService();
ProducerConsumerQueue loggingQueue = new ProducerConsumerQueue(loggingService);

//Inside of client code, get a logger and log with it
ILog logger = LogManager.GetLogger("test");

for (int i = 0; i < 100; i++)
{
logger.InfoFormat("logging message [{0}]", i);
}

在内部,logger/LogManager 在将该组消息添加到队列之前累积一定数量的日志消息(比如 25)。像这样的东西:
internal void AddNewMessage(string message)
{
lock(logMessages)
{
logMessages.Add(message);
if (logMessages.Count >= 25)
{
ObservableCollection<LogMessage> messages = new ObservableCollection<LogMessage>(logMessages);
logMessages.Clear();
loggingQueue.EnqueueLogEvents(messages);
}
}
}

因此,在这种情况下,我希望有 4 个突发,每个突发 25 条消息。根据我的 ProducerConsumerQueue 代码中的 Debug 语句(可能不是调试这个的最佳方式?),我希望看到这样的东西:
  • 工作 - 发送 25 个事件
  • 工作 - 等待
  • 工作 - 返回
  • 工作 - 完成

  • 重复4次。

    相反,我看到的是这样的:

    *1.工作 - 发送 25 个事件

    *2.工作 - 等待

    *4.工作 - 完成

    *1.工作 - 发送 25 个事件

    *2.工作 - 等待

    *3.工作 - 返回

    *4.工作 - 完成

    *1.工作 - 发送 25 个事件

    *2.工作 - 等待

    *3.工作 - 返回

    *4.工作 - 完成

    *1.工作 - 发送 25 个事件

    *2.工作 - 等待

    *3.工作 - 返回

    *3.工作 - 返回

    *4.工作 - 完成

    (添加前导 * 以便这些行不会被 SO 自动编号)

    我想我会预料到,队列将允许添加多个消息突发,但它会在处理下一个突发之前完全处理一个突发(等待 acync 调用完成)。它似乎没有这样做。它似乎不能可靠地等待异步调用的完成。我确实有一个电话给 SetEnqueueLogEvents ,也许这是取消 WaitOne来自 Work方法?

    所以,我有几个问题:
    1.我对我想要完成的事情的解释是否有意义(我的解释是否清楚,这不是一个好主意吗)?
  • 我正在尝试(从客户端传输来自单个线程的消息,按照它们发生的顺序,一次完全处理一组消息)是个好主意吗?
  • 我很亲近吗?
  • 可以做到吗?
  • 应该做吗?

  • 谢谢你的帮助!

    [编辑]
    经过更多调查并感谢布赖恩的建议,我们能够使这项工作正常进行。我已经复制了修改后的代码。关键是我们现在严格为 ProducerConsumerQueue 函数使用“wh”等待句柄。我们现在不是使用 wh 来等待异步调用完成,而是等待由 BeginLogEvents 调用返回的 res.AsyncWaitHandle。
      internal class LoggingQueue : IDisposable
    {
    EventWaitHandle wh = new AutoResetEvent(false);
    Thread worker;
    readonly object locker = new object();
    bool working = false;

    Queue<ObservableCollection<LoggingService.LogEvent>> logEventQueue = new Queue<ObservableCollection<LoggingService.LogEvent>>();

    LoggingService.ILoggingService loggingService;

    internal LoggingQueue(LoggingService.ILoggingService loggingService)
    {
    this.loggingService = loggingService;
    worker = new Thread(Work);
    worker.Start();
    }

    internal void EnqueueLogEvents(ObservableCollection<LoggingService.LogEvent> logEvents)
    {
    lock (locker)
    {
    logEventQueue.Enqueue(logEvents);

    //System.Diagnostics.Debug.WriteLine("EnqueueLogEvents calling Set");

    wh.Set();
    }
    }

    private void Work()
    {
    while (true)
    {
    ObservableCollection<LoggingService.LogEvent> events = null;

    lock (locker)
    {
    if (logEventQueue.Count > 0)
    {
    events = logEventQueue.Dequeue();
    if (events == null || events.Count == 0) return;
    }
    }

    if (events != null && events.Count > 0)
    {
    //System.Diagnostics.Debug.WriteLine("1. Work - Sending {0} events", events.Count);

    IAsyncResult res = loggingService.BeginLogEvents(events, ar =>
    {
    try
    {
    loggingService.EndLogEvents(ar);
    //System.Diagnostics.Debug.WriteLine("3. Work - Back");
    }
    catch (Exception ex)
    {
    }
    }, null);

    //System.Diagnostics.Debug.WriteLine("2. Work - Waiting");

    // Block until async call returns. We are doing this so that we can be sure that all logging messages
    // are sent FROM the client in the order they were generated. ALSO, we don't want interleave blocks of logging
    // messages from the same client by sending a new block of messages before the previous block has been
    // completely processed.

    res.AsyncWaitHandle.WaitOne();

    //System.Diagnostics.Debug.WriteLine("4. Work - Finished");
    }
    else
    {
    wh.WaitOne();
    }
    }
    }

    #region IDisposable Members

    public void Dispose()
    {
    EnqueueLogEvents(null);
    worker.Join();
    wh.Close();
    }

    #endregion
    }

    正如我在最初的问题以及对 Jon 和 Brian 的评论中提到的,我仍然不知道做所有这些工作是否是一个好主意,但至少代码做了我想要它做的事情。这意味着我至少可以选择以这种方式或其他方式(例如事后恢复秩序)而不是没有选择。

    最佳答案

    我可以建议所有这些协调都有一个简单的替代方案吗?有一个使用廉价单调递增 ID 的序列(例如使用 Interlocked.Increment() ),这样无论客户端或服务器发生什么顺序,您都可以稍后重新生成原始顺序。

    这应该让您高效灵活,异步发送您想要的任何内容,而无需等待确认,但不会丢失排序。

    显然,这意味着 ID(或可能是保证唯一的时间戳字段)需要成为 WCF 服务的一部分,但如果您控制两端,那应该相当简单。

    关于c# - 通过工作线程的异步接口(interface)使用 WCF 服务,如何确保从客户端发送事件 "in order",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4136808/

    29 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com