- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我在工作角色方面遇到问题,该角色似乎成功启动并注册了 EventProcessor 类实现(EventProcessorA 和 EventProcessorB),但它们都没有拾取任何事件。我的意思是,IEventProcessor.ProcessEventsAsync 方法根本不会被命中。
每个 EventProcessor 类都为其设置了自己的事件中心。
我的日志显示,为 EventProcessor 类调用了构造函数和 OpenAsync 方法。事实上,它们被调用了 4 次,如下所示。但在此之后,就不再发生任何其他事件。我猜 4 次是因为有四个分区。
SimpleEventProcessorA - Constructor
SimpleEventProcessorA - OpenAsync
SimpleEventProcessorA - Constructor
SimpleEventProcessorA - OpenAsync
SimpleEventProcessorA - Constructor
SimpleEventProcessorA - OpenAsync
SimpleEventProcessorB - Constructor
SimpleEventProcessorB - Open Async
SimpleEventProcessorB - Constructor
SimpleEventProcessorB - OpenAsync
SimpleEventProcessorB - Constructor
SimpleEventProcessorB - OpenAsync
SimpleEventProcessorB - Constructor
SimpleEventProcessorB - OpenAsync
SimpleEventProcessorA - Constructor
SimpleEventProcessorA - OpenAsync
Worker 角色的 RunAsync 方法中也没有为 EventProcessorOptions 提供偏移量,因此所有事件都应该涌入。
此外,在 Azure 门户中,当我启动事件时,我会看到正在发生的事件。
注册EventProcessor的Worker Role代码:
public class WorkerRole : RoleEntryPoint
{
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
private EventProcessorHost eventProcessorHostA;
private EventProcessorHost eventProcessorHostB;
public override void Run()
{
Trace.TraceInformation("ReportWorkerRole is running");
try
{
this.RunAsync(this.cancellationTokenSource.Token).Wait();
}
finally
{
this.runCompleteEvent.Set();
}
}
public override bool OnStart()
{
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
// For information on handling configuration changes
// see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.
bool result = base.OnStart();
Trace.TraceInformation("ReportWorkerRole has been started");
//EventHub Processing
try
{
string eventHubNameA = CloudConfigurationManager.GetSetting("EventHubNameA");
string eventHubNameB = CloudConfigurationManager.GetSetting("EventHubNameB");
string eventHubConnectionString = CloudConfigurationManager.GetSetting("EventHubConnectionString");
string storageAccountName = CloudConfigurationManager.GetSetting("AzureStorageAccount");
string storageAccountKey = CloudConfigurationManager.GetSetting("AzureStorageAccountKey");
string storageConnectionString = CloudConfigurationManager.GetSetting("AzureStorageAccountConnectionString");
string eventProcessorHostNameA = Guid.NewGuid().ToString();
eventProcessorHostA = new EventProcessorHost(eventProcessorHostNameA, eventHubNameA, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
string eventProcessorHostNameB = Guid.NewGuid().ToString();
eventProcessorHostB = new EventProcessorHost(eventProcessorHostNameB, eventHubNameB, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
}
catch (Exception ex)
{
//Logging omitted
}
return result;
}
public override void OnStop()
{
Trace.TraceInformation("ReportWorkerRole is stopping");
this.eventProcessorHostA.UnregisterEventProcessorAsync().Wait();
this.eventProcessorHostB.UnregisterEventProcessorAsync().Wait();
this.cancellationTokenSource.Cancel();
this.runCompleteEvent.WaitOne();
base.OnStop();
Trace.TraceInformation("ReportWorkerRole has stopped");
}
private async Task RunAsync(CancellationToken cancellationToken)
{
var options = new EventProcessorOptions()
{
MaxBatchSize = 100,
PrefetchCount = 10,
ReceiveTimeOut = TimeSpan.FromSeconds(20),
//InitialOffsetProvider = (partitionId) => DateTime.Now
};
options.ExceptionReceived += (sender, e) =>
{
//Logging omitted
};
//Tried both using await and wait
eventProcessorHostA.RegisterEventProcessorAsync<SimpleEventProcessorA>(options).Wait();
eventProcessorHostB.RegisterEventProcessorAsync<SimpleEventProcessorB>(options).Wait();
//await eventProcessorHostA.RegisterEventProcessorAsync<SimpleEventProcessorA>(options);
//await eventProcessorHostB.RegisterEventProcessorAsync<SimpleEventProcessorB>(options);
// TODO: Replace the following with your own logic.
while (!cancellationToken.IsCancellationRequested)
{
Trace.TraceInformation("Working");
await Task.Delay(1000);
}
}
}
事件处理器 A(与 B 相同的配置):
class SimpleEventProcessorA : IEventProcessor
{
Stopwatch checkpointStopWatch;
//Non-relevant variables omitted
public SimpleEventProcessorA()
{
try
{
//Initializing variables using CloudConfigurationManager
//Logging omitted
}
catch (Exception ex)
{
//Logging omitted
}
}
async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
{
//Logging omitted
Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
}
Task IEventProcessor.OpenAsync(PartitionContext context)
{
try
{
//Logging omitted
Console.WriteLine("Initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
this.checkpointStopWatch = new Stopwatch();
this.checkpointStopWatch.Start();
return Task.FromResult<object>(null);
}
catch (Exception ex)
{
//Logging omitted
return Task.FromResult<object>(null);
}
}
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
//Logging omitted
foreach (EventData eventData in messages)
{
try
{
//Logging omitted
Console.WriteLine(string.Format("Message received. Partition: '{0}', Data: '{1}'",
context.Lease.PartitionId, data));
}
catch (Exception ex)
{
//Logging omitted
throw;
}
}
//Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
{
await context.CheckpointAsync();
this.checkpointStopWatch.Restart();
}
}
}
非常感谢任何帮助,谢谢!
更新
看起来一切都很好...这是我在将内容推送到事件中心时使用的连接字符串。
这是我的事件中心连接字符串:
Endpoint=sb://[myEventHubName].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;entityPath=[old-eventhub-name];SharedAccessKey=[mykey]
entityPath
设置不正确。它使用的是我设置的较旧的事件中心名称。它应该是为 eventHubNameA 或 eventHubNameB 设置的值。
最佳答案
回答问题以便其他人可以从中受益。尽管答案在“更新”部分下的问题中有详细说明,但我将在这里重申:
entityPath
设置不正确。它使用的是我设置的较旧的事件中心名称。它应该是为 eventHubNameA 或 eventHubNameB 设置的值。
而不是Endpoint=sb://[myEventHubName].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;entityPath=[old-eventhub-name];SharedAccessKey=[mykey]
应该是Endpoint=sb://[myEventHubName].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;entityPath=[eventHubNameA];SharedAccessKey=[mykey]
关于Azure 云服务 - EventProcessor IEventProcessor.ProcessEventsAsync 未命中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41398939/
关于 APC 操作码缓存,什么是“命中与未命中”?我已经安装了 APC 并且它运行良好,但我有“一些”失误,我想知道这是否“不好”。此外,我正在运行 Openx,因此,我很快就会填满“缓存完整计数”。
我试过这个: def test_send_confirm_hit(monkeypatch): hit = False def called(): global hit
是否可以将找到的单词插入到替换中? $(function() { content = 'hallo mein name ist peter und ich komme aus berlin.
我有一个允许用户将文件上传到文件夹的网站。首先,我检查文件是否存在,如果存在,然后检查复选框的值以确定用户是否要覆盖现有文件。如果点击上传并且未选中该框,我会执行一个带有消息和后退按钮的 die()
我有多个不同的进程通过 IPC 进行通信,当使用 gdb 调试单个进程时,每当遇到断点时,我都会尝试向其他进程发送消息。有没有一种方法可以自动在遇到断点时自动调用一个函数/一段代码(NotifyAll
目前,通过管道传输到 jq 的 cat 命令帮助我解析工作目录中的多个 JSON 文件,并根据正则表达式模式匹配文件中所有可用的电子邮件 ID。但是,我很想识别正则表达式模式被命中/匹配的文件名 ca
我们希望将 podname 解析为 IP,以在 akka 集群中配置种子节点。这是通过在 Kubernetes 中使用 headless (headless)服务和有状态集的概念来实现的。但是,如何在
Maven 项目具有以下文件夹结构: src/main/java src/main/resources src/test/java src/test/resources 如果我们导航到 Maven 项
我只使用 c 几个星期,所以很可能会出现我忽略的明显错误。我看过其他线程,但我不明白我正在读的很多内容。该程序假设有一个无限大的牌组。 已知问题: clearBuffer 当前未使用,我正在尝试不同的
我已将我的 AdMob 代码实现到我的 XML 文件中,如下所示: 在我的 Activity 的 onCreate 方法中: // load ads
我的作业是通过示例程序确定给定跟踪文件的缓存读/写/未命中/命中次数。 举例来说,这是示例跟踪输出的前 10 行。 0x37c852: W 0xbfd4b18c 0x37cfe0: W 0xbfd
https://plnkr.co/edit/2h6fV5yTjeUqLP3SvbvO?p=preview 预期 登录后应用程序重定向到 $state container,其中包含 dashboard
ElasticSearch 独立于 from 和 size 参数,基于查询的所有命中构建聚合结果。在大多数情况下,这是我们想要的,但我有一个特殊情况,我需要将聚合限制为前 N 个命中。 limits
我使用 Intel PCM 进行细粒度的 CPU 测量。在我的代码中,我试图测量缓存效率。 基本上,我首先将一个小数组放入 L1 缓存(通过多次遍历),然后启动计时器,再遍历数组一次(希望使用缓存),
我在为 javascript 滑动元素定义点击区域时遇到问题。 参见示例: http://www.warface.co.uk/clients/warface.co.uk/ 请滑过右侧的灰色框以显示按钮
我正在尝试在 foldersystem 中使用 os.walk() 找到几个 'my_file.bat',如果文件名匹配它应该用 subprocess.call() 或 .run() 调用。问题是 o
我有一个端点,我可以在其中请求我使用 Siesta 查询的多条数据(例如 https://example.com/things?ids=1,2,3) .如果我只缓存了一些 things ,我试图弄清楚
这是我的代码: public static function test(){ try{ $apiContext = ApiContext::create(
我使用 PHP 在需要时传递登录表单,代码如下: $htmlForm = ''.''; switch(LOGIN_METHOD) { case 'both': $htmlFor
我正在使用 nginx-lua带有 redis 的模块提供 ember-app 的静态文件. index文件内容存储在redis作为 value由 nginx 正确提供服务当(根)domain/IP被
我是一名优秀的程序员,十分优秀!