- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我一直致力于让长时间运行的消息与 Azure 传输上的 NServiceBus 一起工作。基于 this document ,我想我可以在一个单独的线程中触发这个长进程,将事件处理程序任务标记为完成,然后监听自定义的 OperationStarted 或 OperationComplete 事件。我注意到在大多数情况下我的处理程序没有收到 OperationComplete 事件。事实上,唯一一次收到它是在我发布 OperationStarted 事件后立即发布它的时候。两者之间的任何实际处理都会以某种方式阻止接收到完成事件。这是我的代码:
用于长时间运行消息的抽象类
public abstract class LongRunningOperationHandler<TMessage> : IHandleMessages<TMessage> where TMessage : class
{
protected ILog _logger => LogManager.GetLogger<LongRunningOperationHandler<TMessage>>();
public Task Handle(TMessage message, IMessageHandlerContext context)
{
var opStarted = new OperationStarted
{
OperationID = Guid.NewGuid(),
OperationType = typeof(TMessage).FullName
};
var errors = new List<string>();
// Fire off the long running task in a separate thread
Task.Run(() =>
{
try
{
_logger.Info($"Operation Started: {JsonConvert.SerializeObject(opStarted)}");
context.Publish(opStarted);
ProcessMessage(message, context);
}
catch (Exception ex)
{
errors.Add(ex.Message);
}
finally
{
var opComplete = new OperationComplete
{
OperationType = typeof(TMessage).FullName,
OperationID = opStarted.OperationID,
Errors = errors
};
context.Publish(opComplete);
_logger.Info($"Operation Complete: {JsonConvert.SerializeObject(opComplete)}");
}
});
return Task.CompletedTask;
}
protected abstract void ProcessMessage(TMessage message, IMessageHandlerContext context);
}
测试实现
public class TestLongRunningOpHandler : LongRunningOperationHandler<TestCommand>
{
protected override void ProcessMessage(TestCommand message, IMessageHandlerContext context)
{
// If I remove this, or lessen it to something like 200 milliseconds, the
// OperationComplete event gets handled
Thread.Sleep(1000);
}
}
操作事件
public sealed class OperationComplete : IEvent
{
public Guid OperationID { get; set; }
public string OperationType { get; set; }
public bool Success => !Errors?.Any() ?? true;
public List<string> Errors { get; set; } = new List<string>();
public DateTimeOffset CompletedOn { get; set; } = DateTimeOffset.UtcNow;
}
public sealed class OperationStarted : IEvent
{
public Guid OperationID { get; set; }
public string OperationType { get; set; }
public DateTimeOffset StartedOn { get; set; } = DateTimeOffset.UtcNow;
}
处理程序
public class OperationHandler : IHandleMessages<OperationStarted>
, IHandleMessages<OperationComplete>
{
static ILog logger = LogManager.GetLogger<OperationHandler>();
public Task Handle(OperationStarted message, IMessageHandlerContext context)
{
return PrintJsonMessage(message);
}
public Task Handle(OperationComplete message, IMessageHandlerContext context)
{
// This is not hit if ProcessMessage takes too long
return PrintJsonMessage(message);
}
private Task PrintJsonMessage<T>(T message) where T : class
{
var msgObj = new
{
Message = typeof(T).Name,
Data = message
};
logger.Info(JsonConvert.SerializeObject(msgObj, Formatting.Indented));
return Task.CompletedTask;
}
}
我确定 context.Publish()
调用被触发了,因为 _logger.Info()
调用正在向我的测试控制台打印消息。我还验证了它们遇到了断点。在我的测试中,运行时间超过 500 毫秒的任何内容都会阻止处理 OperationComplete 事件。
如果有人可以就在 ProcessMessage 实现中经过任何大量时间后为什么 OperationComplete 事件没有触发处理程序提出建议,我将非常感激听到他们的意见。谢谢!
-- 更新--万一其他人遇到这个问题并对我最终做了什么感到好奇:
在 an exchange 之后与 NServiceBus 的开发人员一起,我决定使用实现 IHandleTimeouts 接口(interface)的 watchdog saga 来定期检查作业是否完成。我正在使用在作业完成时更新的 saga 数据来确定是否在超时处理程序中触发 OperationComplete
事件。这提出了另一个问题:使用内存中持久性时,saga 数据是 not persisted跨线程,即使它被每个线程锁定。为了解决这个问题,我专门为长时间运行的内存数据持久化创建了一个接口(interface)。该接口(interface)作为单例注入(inject)到 saga 中,因此用于跨线程读取/写入 saga 数据以进行长时间运行的操作。
我知道不建议使用内存中持久性,但根据我的需要,配置另一种类型的持久性(如 Azure 表)太过分了;我只是希望 OperationComplete
事件在正常情况下触发。如果在运行作业期间发生重启,我不需要保留 saga 数据。作业无论如何都会被缩短,如果作业运行时间超过设定的最长时间,saga 超时将处理触发 OperationComplete
事件并出现错误。
最佳答案
原因是如果 ProcessMessage
足够快,您可能会在它失效之前获取当前 context
,例如被处置。
通过从 Handle
成功返回,您告诉 NServiceBus:“我已处理完这条消息”,因此它可以使用 context
做它想做的事好吧,比如使它无效。在后台处理器中,您需要一个端点实例,而不是消息上下文。
当新任务开始运行时,您不知道 Handle
是否已返回,因此您应该只考虑消息已被使用因此无法恢复。如果在您的单独任务中发生错误,您将无法重试。
避免没有持久性的长时间运行的进程。您提到的示例有一个服务器,用于存储消息中的工作项,以及一个为工作项轮询此存储的进程。也许不理想,以防您横向扩展处理器,但它不会丢失消息。
为避免持续轮询,将服务器和处理器合并,在启动时无条件轮询一次,并在Handle
中安排轮询任务。注意此任务仅在没有其他轮询任务正在运行时进行轮询,否则它可能会变得比持续轮询更糟糕。您可以使用信号量来控制它。
要横向扩展,您必须拥有更多服务器。您需要测量 N 个处理器轮询的成本是否大于以循环方式发送到 N 个服务器的成本,对于某些 N,要知道哪种方法实际上执行得更好。在实践中,轮询对于低 N 就足够了。
为多个处理器修改示例可能需要较少的部署和配置工作,您只需添加或使用处理器,而添加或删除服务器需要在指向它们的所有地方(例如配置文件)更改它们的点。
另一种方法是将漫长的过程分解为多个步骤。 NServiceBus 有 sagas。这是一种通常为已知或有限数量的步骤实现的方法。对于未知数量的步骤,它仍然是可行的,尽管有些人可能认为这是对 sagas 看似预期目的的滥用。
关于c# - NServiceBus 事件在单独的线程中发布时丢失,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49038136/
显然是在学习java。我能够让游戏正常运行。但我需要这样做,以便如果用户放置 R/P/S 以外的东西,那么它默认为 Rock。我不需要循环。如果我放石头的话,游戏就完美了。如果我放置 RPS 以外的任
我刚刚开始使用 python,需要一些帮助!我当时在做剪刀石头布游戏,我想在人或计算机赢得 3 场胜利后添加一个重启选项。 我已经四处寻找一些答案,但从我看到的所有其他代码来看,似乎超出了我的范围,或
我正在用 C# 制作石头、剪刀、布游戏,目前在有人输入非 R、S 或 P 的输入时尝试显示消息时遇到问题。例如,我正在尝试获取默认值在 switch 语句中工作,但我没有运气。这就是我目前所拥有的。如
我正在用 C# 制作石头、剪刀、布游戏,目前在有人输入非 R、S 或 P 的输入时尝试显示消息时遇到问题。例如,我正在尝试获取默认值在 switch 语句中工作,但我没有运气。这就是我目前所拥有的。如
我的类(class)被分配了一个石头剪刀布游戏。以下是作业说明: 学习目标: 练习枚举的使用 创建一个包含构造函数、字段和方法的枚举 设计并实现您自己的 GUI 创建一个可运行的 jar 描述:编写一
我正在用Python创建一个石头剪刀布游戏。我的方法如下。然而,我不想打印玩家 1 和玩家 2,而是想让它们成为真实的名字。因此,可以说 John Wins 或 Joe Wins。如何实现每次打印玩家
我正在尝试完成类里面的石头、剪刀、布作业。 我收到“UnboundLocalError:赋值前引用的局部变量“绑定(bind)”” 错误。 有人可以告诉我为什么会收到此错误吗? import rand
我在 C++ 上学习石头剪刀布游戏时有一个游戏实例类。我想创建整数常量,它表示游戏中允许的符号数。对于经典的 rsp 游戏,它是 3(石头、剪刀和布),但是有一些有趣的 rcs 游戏扩展带有额外的符号
我正在上 Codecademy 类(class)(找到 here ),但一直告诉我“当输入是纸和石头时,您的代码返回‘石头获胜’而不是‘纸获胜’”,为什么?应该是正确的。既然它在谈论“石头获胜”,那么
我正在可汗学院编写一个剪刀石头布游戏,这样我就可以看到视觉效果,但是 var Compare = function(choice1, choice2) 无法正常工作。 html 它工作得很好。插入了我
对于我的类(class)项目,我们要制作一个“石头剪刀布”游戏,使用一个函数来显示菜单和验证输入,以及一个函数来确定谁赢了。当我绕过第一个函数时,我的代码将编译并正常运行。但是,当我同时使用这两个函数
我目前受困于我的 RPS 程序,因为它无法正确存储用户丢失的次数或与计算机的关联。例如,当我运行程序并输入“q”退出时,我得到以下输出: Enter R, P, S, or Q (for quit)
已关闭。此问题不符合Stack Overflow guidelines 。目前不接受答案。 已关闭 7 年前。 此问题是由拼写错误或无法再重现的问题引起的。虽然类似的问题可能是 on-topic在这里
我一直在尝试解决石头剪刀布项目,但我不知道如何做出 if/else 语句。我这样做了很多次,最后我认为我已经接近解决问题了,但问题是,每次运行程序时我都会得到错误的输出。 例如,我使用“Paper”,
以下是我在 BlueJ 中编写的石头剪刀布游戏的代码。当我编译并且用户输入输入时,计算机立即从playerWins()打印大量输出。当用户输入“退出”时游戏结束。有人可以帮助我,这样我的屏幕就不会被淹
如果之前曾问过此问题的某个版本,我们深表歉意。我查了一下,但找不到真正能解决我的问题的东西。 我今天开始学习Python,并尝试构建一个简单的石头剪刀布游戏。 我有以下代码,它运行得很好: impor
我今天一直在开发这个应用程序,但我似乎无法弄清楚代码有什么问题。 当我检查时,没有语法错误。但是当我尝试运行它并调试它时,它说 "Source not found" on "ActivityThrea
关闭。此题需要details or clarity 。目前不接受答案。 想要改进这个问题吗?通过 editing this post 添加详细信息并澄清问题. 已关闭 8 年前。 Improve th
我有 JS 中的基本“石头、剪刀、布”游戏的代码。它可以与提示一起使用,但我希望能够通过按钮做出选择。我想使用“getElementById”和“addEventListener(“click”)”。
有没有一种方法可以用大于号和小于号来做剪刀石头布游戏? 我知道执行 RPS 有不同的方法,但我想具体了解大于和小于。 这是我的代码: "Rock" > "Scissors" and 'Rock' "
我是一名优秀的程序员,十分优秀!