- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我目前正在从事物联网工作,在我当前的项目中,我创建了一个 Azure 云服务项目,其中我创建了辅助角色,在辅助角色中我编写了以下代码行。
public class WorkerRole : RoleEntryPoint
{
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
private static string connectionString;
private static string eventHubName;
public static ServiceClient iotHubServiceClient { get; private set; }
public static EventHubClient eventHubClient { get; private set; }
public override void Run()
{
Trace.TraceInformation("EventsForwarding Run()...\n");
try
{
this.RunAsync(this.cancellationTokenSource.Token).Wait();
}
finally
{
this.runCompleteEvent.Set();
}
}
public override bool OnStart()
{
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
// For information on handling configuration changes
// see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.
bool result = base.OnStart();
Trace.TraceInformation("EventsForwarding OnStart()...\n");
connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
eventHubName = ConfigurationManager.AppSettings["Microsoft.ServiceBus.EventHubName"];
string storageAccountName = ConfigurationManager.AppSettings["AzureStorage.AccountName"];
string storageAccountKey = ConfigurationManager.AppSettings["AzureStorage.Key"];
string storageAccountString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}",
storageAccountName, storageAccountKey);
string iotHubConnectionString = ConfigurationManager.AppSettings["AzureIoTHub.ConnectionString"];
iotHubServiceClient = ServiceClient.CreateFromConnectionString(iotHubConnectionString);
eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
var defaultConsumerGroup = eventHubClient.GetDefaultConsumerGroup();
string eventProcessorHostName = "SensorEventProcessor";
EventProcessorHost eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName, defaultConsumerGroup.GroupName, connectionString, storageAccountString);
eventProcessorHost.RegisterEventProcessorAsync<SensorEventProcessor>().Wait();
Trace.TraceInformation("Receiving events...\n");
return result;
}
public override void OnStop()
{
Trace.TraceInformation("EventsForwarding is OnStop()...");
this.cancellationTokenSource.Cancel();
this.runCompleteEvent.WaitOne();
base.OnStop();
Trace.TraceInformation("EventsForwarding has stopped");
}
private async Task RunAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
//Trace.TraceInformation("EventsToCommmandsService running...\n");
await Task.Delay(1000);
}
}
}
接下来,我在 SensorEventProcessor 中编写了以下代码行,用于从事件中心接收消息并将这些消息发送到 IoT 中心。
class SensorEventProcessor : IEventProcessor
{
Stopwatch checkpointStopWatch;
PartitionContext partitionContext;
public async Task CloseAsync(PartitionContext context, CloseReason reason)
{
Trace.TraceInformation(string.Format("EventProcessor Shuting Down. Partition '{0}', Reason: '{1}'.", this.partitionContext.Lease.PartitionId, reason.ToString()));
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
}
public Task OpenAsync(PartitionContext context)
{
Trace.TraceInformation(string.Format("Initializing EventProcessor: Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset));
this.partitionContext = context;
this.checkpointStopWatch = new Stopwatch();
this.checkpointStopWatch.Start();
return Task.FromResult<object>(null);
}
public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
Trace.TraceInformation("\n");
Trace.TraceInformation("........ProcessEventsAsync........");
//string commandParameterNew = "{\"Name\":\"AlarmThreshold\",\"Parameters\":{\"SensorId\":\"" + "Hello World" + "\"}}";
//await WorkerRole.iotHubServiceClient.SendAsync("astranidevice", new Microsoft.Azure.Devices.Message(Encoding.UTF8.GetBytes(commandParameterNew)));
foreach (EventData eventData in messages)
{
try
{
string jsonString = Encoding.UTF8.GetString(eventData.GetBytes());
Trace.TraceInformation(string.Format("Message received at '{0}'. Partition: '{1}'",
eventData.EnqueuedTimeUtc.ToLocalTime(), this.partitionContext.Lease.PartitionId));
Trace.TraceInformation(string.Format("-->Raw Data: '{0}'", jsonString));
SimpleTemperatureAlertData newSensorEvent = this.DeserializeEventData(jsonString);
Trace.TraceInformation(string.Format("-->Serialized Data: '{0}', '{1}', '{2}', '{3}', '{4}'",
newSensorEvent.Time, newSensorEvent.RoomTemp, newSensorEvent.RoomPressure, newSensorEvent.RoomAlt, newSensorEvent.DeviceId));
// Issuing alarm to device.
string commandParameterNew = "{\"Name\":\"AlarmThreshold\",\"Parameters\":{\"SensorId\":\"" + "Hello World" + "\"}}";
Trace.TraceInformation("Issuing alarm to device: '{0}', from sensor: '{1}'", newSensorEvent.DeviceId, newSensorEvent.RoomTemp);
Trace.TraceInformation("New Command Parameter: '{0}'", commandParameterNew);
await WorkerRole.iotHubServiceClient.SendAsync(newSensorEvent.DeviceId, new Microsoft.Azure.Devices.Message(Encoding.UTF8.GetBytes(commandParameterNew)));
}
catch (Exception ex)
{
Trace.TraceInformation("Error in ProssEventsAsync -- {0}\n", ex.Message);
}
}
await context.CheckpointAsync();
}
private SimpleTemperatureAlertData DeserializeEventData(string eventDataString)
{
return JsonConvert.DeserializeObject<SimpleTemperatureAlertData>(eventDataString);
}
}
当我调试代码时,ProcessEventsAsync(PartitionContext context, IEnumerable messages) 方法永远不会调用,只会进入 OpenAsync() 方法,然后停止调试。
请告诉我我在项目中哪里出错了,并告诉我何时调用 ProcessEventsAsync() 方法。
问候,
普拉迪普
最佳答案
当 EventHub 中存在任何未处理的消息时,将调用 IEventProcessor.ProcessEventsAsync。
事件中心包含多个分区。分区是事件的有序序列。在分区内,每个事件都包含一个偏移量。消费者 (IEventProcessor) 使用此偏移量来显示给定分区的事件序列中的位置。当 IEventProcessor 连接 (EventProcessorHost.RegisterEventProcessorAsync) 时,它会将此偏移量传递到事件中心以指定开始读取的位置。当有未处理的消息(具有较高偏移量的事件)时,它们将被传递到 IEventProcessor。检查点用于持久保存已处理消息的偏移量 (PartitionContext.CheckpointAsync)。
您可以找到有关EventHub内部的详细信息:Azure Event Hubs overview
您是否向 EventHub 发送了任何消息 (EventHubClient.SendAsync(EventData))?
关于azure - 当 ProcessEventsAsync(PartitionContext context, ienumerable<EventData> messages) 方法将被触发时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37075130/
我正在尝试测试 IEventProcessor 的实现。但是,我遇到了困难,因为不可能实际模拟 PartitionContext 类 ( https://msdn.microsoft.com/en-u
我目前正在从事物联网工作,在我当前的项目中,我创建了一个 Azure 云服务项目,其中我创建了辅助角色,在辅助角色中我编写了以下代码行。 public class WorkerRole : Role
我是一名优秀的程序员,十分优秀!