- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我在理解 C# Task、async 和 await 模式时遇到了问题。
Windows 服务,.NET v4.5.2 服务器端。
我有一个 Windows 服务接受各种来源的传入记录,通过自托管 Web API 从外部临时到达。我想对这些记录进行批处理,然后将它们转发到另一个服务。如果批处理记录的数量超过阈值,则应立即分派(dispatch)该批处理。此外,如果时间间隔已经过去,那么也应该按原样发送批处理。这意味着记录的保存时间永远不会超过 N 秒。
我正在努力将其纳入基于任务的异步模式。
在过去的日子里,我会创建一个线程、一个 ManualResetEvent 和一个 System.Threading.Timer。 Thread 将循环等待重置事件。定时器将在触发时设置事件,当批处理大小超过阈值时执行聚合的代码也是如此。在 Wait 之后,Thread 将停止 Timer,进行分派(dispatch)(一个 HTTP Post),重置 Timer 并清除 ManualResetEvent,循环返回并等待。
但是,我看到人们说这是“糟糕的”,因为 Wait 只会阻塞有值(value)的线程资源,而 async/await 是我的 Elixir 。
首先,他们是对的吗?我的方法是否过时且效率低下,或者我可以 JFDI 吗?
我找到了示例 here用于批处理和 here对于间隔的任务,但不是两者的组合。
这个要求实际上与 async/await 兼容吗?
最佳答案
实际上,你几乎做对了,他们也有部分是对的。
你应该知道的是,你应该避免空闲线程,长时间等待事件或等待 I/O 完成(等待很少争用的锁和快速语句 block 或带有比较和交换的旋转循环 < em>通常 OK)。
他们中的大多数人不知道任务并不神奇,例如, Task.Delay
uses a Timer
(更确切地说,一个 System.Threading.Timer
)和 waiting on a non-complete task ends up using a ManualResetEventSlim
(对 ManualResetEvent
的改进,因为它不会创建 Win32 事件,除非明确要求,例如 ((IAsyncResult)task).AsyncWaitHandle
)。
是的,您的要求可以通过 async/await 或一般任务来实现。
Runnable example at .NET Fiddle :
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
public class Record
{
private int n;
public Record(int n)
{
this.n = n;
}
public int N { get { return n; } }
}
public class RecordReceiver
{
// Arbitrary constants
// You should fetch value from configuration and define sensible defaults
private static readonly int threshold = 5;
// I chose a low value so the example wouldn't timeout in .NET Fiddle
private static readonly TimeSpan timeout = TimeSpan.FromMilliseconds(100);
// I'll use a Stopwatch to trace execution times
private readonly Stopwatch sw = Stopwatch.StartNew();
// Using a separate private object for locking
private readonly object lockObj = new object();
// The list of accumulated records to execute in a batch
private List<Record> records = new List<Record>();
// The most recent TCS to signal completion when:
// - the list count reached the threshold
// - enough time has passed
private TaskCompletionSource<IEnumerable<Record>> batchTcs;
// A CTS to cancel the timer-based task when the threshold is reached
// Not strictly necessary, but it reduces resource usage
private CancellationTokenSource delayCts;
// The task that will be completed when a batch of records has been dispatched
private Task dispatchTask;
// This method doesn't use async/await,
// because we're not doing an async flow here.
public Task ReceiveAsync(Record record)
{
Console.WriteLine("Received record {0} ({1})", record.N, sw.ElapsedMilliseconds);
lock (lockObj)
{
// When the list of records is empty, set up the next task
//
// TaskCompletionSource is just what we need, we'll complete a task
// not when we've finished some computation, but when we reach some criteria
//
// This is the main reason this method doesn't use async/await
if (records.Count == 0)
{
// I want the dispatch task to run on the thread pool
// In .NET 4.6, there's TaskCreationOptions.RunContinuationsAsynchronously
// .NET 4.6
//batchTcs = new TaskCompletionSource<IEnumerable<Record>>(TaskCreationOptions.RunContinuationsAsynchronously);
//dispatchTask = DispatchRecordsAsync(batchTcs.Task);
// Previously, we have to set up a continuation task using the default task scheduler
// .NET 4.5.2
batchTcs = new TaskCompletionSource<IEnumerable<Record>>();
var asyncContinuationsTask = batchTcs.Task
.ContinueWith(bt => bt.Result, TaskScheduler.Default);
dispatchTask = DispatchRecordsAsync(asyncContinuationsTask);
// Create a cancellation token source to be able to cancel the timer
//
// To be used when we reach the threshold, to release timer resources
delayCts = new CancellationTokenSource();
Task.Delay(timeout, delayCts.Token)
.ContinueWith(
dt =>
{
// When we hit the timer, take the lock and set the batch
// task as complete, moving the current records to its result
lock (lockObj)
{
// Avoid dispatching an empty list of records
//
// Also avoid a race condition by checking the cancellation token
//
// The race would be for the actual timer function to start before
// we had a chance to cancel it
if ((records.Count > 0) && !delayCts.IsCancellationRequested)
{
batchTcs.TrySetResult(new List<Record>(records));
records.Clear();
}
}
},
// Since our continuation function is fast, we want it to run
// ASAP on the same thread where the actual timer function runs
//
// Note: this is just a hint, but I trust it'll be favored most of the time
TaskContinuationOptions.ExecuteSynchronously);
// Remember that we want our batch task to have continuations
// running outside the timer thread, since dispatching records
// is probably too much work for a timer thread.
}
// Actually store the new record somewhere
records.Add(record);
// When we reach the threshold, set the batch task as complete,
// moving the current records to its result
//
// Also, cancel the timer task
if (records.Count >= threshold)
{
batchTcs.TrySetResult(new List<Record>(records));
delayCts.Cancel();
records.Clear();
}
// Return the last saved dispatch continuation task
//
// It'll start after either the timer or the threshold,
// but more importantly, it'll complete after it dispatches all records
return dispatchTask;
}
}
// This method uses async/await, since we want to use the async flow
internal async Task DispatchRecordsAsync(Task<IEnumerable<Record>> batchTask)
{
// We expect it to return a task right here, since the batch task hasn't had
// a chance to complete when the first record arrives
//
// Task.ConfigureAwait(false) allows us to run synchronously and on the same thread
// as the completer, but again, this is just a hint
//
// Remember we've set our task to run completions on the thread pool?
//
// With .NET 4.6, completing a TaskCompletionSource created with
// TaskCreationOptions.RunContinuationsAsynchronously will start scheduling
// continuations either on their captured SynchronizationContext or TaskScheduler,
// or forced to use TaskScheduler.Default
//
// Before .NET 4.6, completing a TaskCompletionSource could mean
// that continuations ran withing the completer, especially when
// Task.ConfigureAwait(false) was used on an async awaiter, or when
// Task.ContinueWith(..., TaskContinuationOptions.ExecuteSynchronously) was used
// to set up a continuation
//
// That's why, before .NET 4.6, we need to actually run a task for that effect,
// and we used Task.ContinueWith without TaskContinuationOptions.ExecuteSynchronously
// and with TaskScheduler.Default, to ensure it gets scheduled
//
// So, why am I using Task.ConfigureAwait(false) here anyway?
// Because it'll make a difference if this method is run from within
// a Windows Forms or WPF thread, or any thread with a SynchronizationContext
// or TaskScheduler that schedules tasks on a dedicated thread
var batchedRecords = await batchTask.ConfigureAwait(false);
// Async methods are transformed into state machines,
// much like iterator methods, but with async specifics
//
// What await actually does is:
// - check if the awaitable is complete
// - if so, continue executing
// Note: if every awaited awaitable is complete along an async method,
// the method will complete synchronously
// This is only expectable with tasks that have already completed
// or I/O that is always ready, e.g. MemoryStream
// - if not, return a task and schedule a continuation for just after the await expression
// Note: the continuation will resume the state machine on the next state
// Note: the returned task will complete on return or on exception,
// but that is something the compiled state machine will handle
foreach (var record in batchedRecords)
{
Console.WriteLine("Dispatched record {0} ({1})", record.N, sw.ElapsedMilliseconds);
// I used Task.Yield as a replacement for actual work
//
// It'll force the async state machine to always return here
// and shedule a continuation that reenters the async state machine right afterwards
//
// This is not something you usually want on production code,
// so please replace this with the actual dispatch
await Task.Yield();
}
}
}
public class Program
{
public static void Main()
{
// Our main entry point is synchronous, so we run an async entry point and wait on it
//
// The difference between MainAsync().Result and MainAsync().GetAwaiter().GetResult()
// is in the way exceptions are thrown:
// - the former aggregates exceptions, throwing an AggregateException
// - the latter doesn't aggregate exceptions if it doesn't have to, throwing the actual exception
//
// Since I'm not combining tasks (e.g. Task.WhenAll), I'm not expecting multiple exceptions
//
// If my main method returned int, I could return the task's result
// and I'd make MainAsync return Task<int> instead of just Task
MainAsync().GetAwaiter().GetResult();
}
// Async entry point
public static async Task MainAsync()
{
var receiver = new RecordReceiver();
// I'll provide a few records:
// - a delay big enough between the 1st and the 2nd such that the 1st will be dispatched
// - 8 records in a row, such that 5 of them will be dispatched, and 3 of them will wait
// - again, a delay big enough that will provoke the last 3 records to be dispatched
// - and a final record, which will wait to be dispatched
//
// We await for Task.Delay between providing records,
// but we'll await for the records in the end only
//
// That is, we'll not await each record before the next,
// as that would mean each record would only be dispatched after at least the timeout
var t1 = receiver.ReceiveAsync(new Record(1));
await Task.Delay(TimeSpan.FromMilliseconds(300));
var t2 = receiver.ReceiveAsync(new Record(2));
var t3 = receiver.ReceiveAsync(new Record(3));
var t4 = receiver.ReceiveAsync(new Record(4));
var t5 = receiver.ReceiveAsync(new Record(5));
var t6 = receiver.ReceiveAsync(new Record(6));
var t7 = receiver.ReceiveAsync(new Record(7));
var t8 = receiver.ReceiveAsync(new Record(8));
var t9 = receiver.ReceiveAsync(new Record(9));
await Task.Delay(TimeSpan.FromMilliseconds(300));
var t10 = receiver.ReceiveAsync(new Record(10));
// I probably should have used a list of records, but this is just an example
await Task.WhenAll(t1, t2, t3, t4, t5, t6, t7, t8, t9, t10);
}
}
你可以让它更有趣,比如返回一个不同的任务,比如 Task<RecordDispatchReport>
, 来自 ReceiveAsync
由DispatchRecords
的处理部分完成, 使用 TaskCompletionSource
对于每条记录。
关于C# 异步聚合和分派(dispatch),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37081088/
在使用 dispatch 更新 useReducer 状态后,我需要能够立即执行操作。但是 dispatch 运行异步所以当我运行我的下一段代码时,它在 dispatch 应该更新它之前使用旧状态。
我是 Redux 新手。我必须调度一个操作来更新应用程序的状态,然后使用更新状态来调用我的后端。我使用 thunkMiddleware。 const mapDispatchToProps = disp
我是 Redux 新手。我必须调度一个操作来更新应用程序的状态,然后使用更新状态来调用我的后端。我使用 thunkMiddleware。 const mapDispatchToProps = disp
当我单击 InspectorOption 组件之一时,我的 redux 记录器显示已分派(dispatch)操作并且状态按预期更新。 我的 InspectorSelect 和子 InspectorOp
我有一个模块,其中包含一组函数,实现为带有辅助函数的分派(dispatch)哈希: my $functions = { 'f1' => sub { my %args = @_;
Apple 的 GCD 文档说明如下: GCD provides and manages FIFO queues to which your application can submit tasks
所以我正在获取此 UIImage 数据并将其转换为 base64 中的字符串。问题是它在转换时卡在 UI 线程上,我不确定为什么。 - (void)processImage:(UIImage*)ima
我有一个从后台线程调用的函数 func getValue() -> Bool。这是有意的,也是必须的。现在,getValue() 需要在主线程上执行一些操作,在这种情况下它需要访问 UIApplica
我有一个带有表单的组件,可以将项目添加到列表中。成功将项目添加到列表后,我想使用 form.resetForm(); ,但我想不出一个简单的方法来知道该操作何时成功。我希望我可以订阅 Action 调
我正在努力在 Laravel 6.x 中使用 cursor() 方法获取 3M+ 记录时降低内存。 我有一个 artisan 命令运行以下代码: Product::cursor()->each(fun
好的,所以我一直在尝试通过并发编辑设置电子表格应用程序。我走了laravel回声,redis,套接字路由。 (任何有关仅使用推动器的建议都会被驳回)。现在大多数情况下,我已经开始工作了,我可以从修补匠
假设我有这个布局 span 我将无处不在的点击转换为自定义事件,并使用委托(delegate)的非捕获处理程序将其分派(dispatch)到其原始目标: document.ad
页面加载后,我将在我的 index.js 中调度一个操作 store.dispatch(getWeatherReports()); 来访问天气 API。此操作通过 redux 过程,最终将返回的数据添
我有一个抽象父类 Parent 和六个子类 ChildA though ChildF。 另一个类 Other 有六个(静态)重载方法 olmeth(),六个子类中的每一个。 我怎么写: Parent
这里描述了类似的问题:GWT IllegalArgumentException: encodedRequest cannot be empty 我的GWT应用程序部署在Tomcat6中,该Tomcat
我正在尝试通过 iOS 上的 GCD 将一些代码分派(dispatch)到主队列,但即使是最简单的测试也总是失败。最后归结为: static const int TICK_INTERVAL = 1;
在某些情况下,覆盖扩展中的方法签名似乎会产生不可预知的结果。以下示例演示了具有相似模式的两个不同结果。 class A: UIViewController { func doThing() {
这个问题在这里已经有了答案: Is self retained within this Objective-C block? (1 个回答) 8年前关闭。 假设我有一个简单的电话 dispatch_a
React 中的上下文和 reducers 非常新。我目前正在尝试使用 Context 从折线图上的事件中获取日期字符串。我使用的折线图来自 react-chartjs-2。 我的上下文已设置并提供如
我有一个项目,其中nodejs服务器通过socket.io将推送事件传递到react仪表板,我正在使用Redux。当收到新数据时,会触发一个操作来更新所有相关组件,尽管我不确定我这样做的方式是否正确。
我是一名优秀的程序员,十分优秀!