- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试构建一个服务,为许多异步客户端提供队列以发出请求并等待响应。我需要能够通过每 Y 个持续时间的 X 个请求来限制队列处理。例如:每秒 50 个 Web 请求。它用于第 3 方 REST 服务,我每秒只能发出 X 个请求。
发现了许多 SO 问题,它引导我走上了使用 TPL 数据流的道路,我使用了 TranformBlock 来提供我的自定义节流,然后使用 X 数量的 ActionBlocks 来并行完成任务。 Action 的实现似乎有点笨拙,所以想知道是否有更好的方法让我将 Tasks 传递到管道中,以便在完成后通知调用者。
我想知道是否有更好或更优化/更简单的方法来做我想做的事?我的实现有什么明显的问题吗?我知道它缺少取消和异常处理,我接下来会这样做,但非常欢迎您提出意见。
我已经 Extended Stephen Cleary's example for my Dataflow pipeline并使用
svick's concept of a time throttled TransformBlock .我想知道是否可以使用纯 SemaphoreSlim design 轻松实现我构建的内容,这是基于时间的最大操作节流,我认为这会使事情复杂化。
这是最新的实现。 FIFO 队列异步队列,我可以在其中传递自定义操作。
public class ThrottledProducerConsumer<T>
{
private class TimerState<T1>
{
public SemaphoreSlim Sem;
public T1 Value;
}
private BufferBlock<T> _queue;
private IPropagatorBlock<T, T> _throttleBlock;
private List<Task> _consumers;
private static IPropagatorBlock<T1, T1> CreateThrottleBlock<T1>(TimeSpan Interval, Int32 MaxPerInterval)
{
SemaphoreSlim _sem = new SemaphoreSlim(MaxPerInterval);
return new TransformBlock<T1, T1>(async (x) =>
{
var sw = new Stopwatch();
sw.Start();
//Console.WriteLine($"Current count: {_sem.CurrentCount}");
await _sem.WaitAsync();
sw.Stop();
var now = DateTime.UtcNow;
var releaseTime = now.Add(Interval) - now;
//-- Using timer as opposed to Task.Delay as I do not want to await or wait for it to complete
var tm = new Timer((s) => {
var state = (TimerState<T1>)s;
//Console.WriteLine($"RELEASE: {state.Value} was released {DateTime.UtcNow:mm:ss:ff} Reset Sem");
state.Sem.Release();
}, new TimerState<T1> { Sem = _sem, Value = x }, (int)Interval.TotalMilliseconds,
-1);
/*
Task.Delay(delay).ContinueWith((t)=>
{
Console.WriteLine($"RELEASE(FAKE): {x} was released {DateTime.UtcNow:mm:ss:ff} Reset Sem");
//_sem.Release();
});
*/
//Console.WriteLine($"{x} was tramsformed in {sw.ElapsedMilliseconds}ms. Will release {now.Add(Interval):mm:ss:ff}");
return x;
},
//new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
//
new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 10 });
}
public ThrottledProducerConsumer(TimeSpan Interval, int MaxPerInterval, Int32 QueueBoundedMax = 5, Action<T> ConsumerAction = null, Int32 MaxConsumers = 1)
{
var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1, };
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true, };
//-- Create the Queue
_queue = new BufferBlock<T>(new DataflowBlockOptions { BoundedCapacity = QueueBoundedMax, });
//-- Create and link the throttle block
_throttleBlock = CreateThrottleBlock<T>(Interval, MaxPerInterval);
_queue.LinkTo(_throttleBlock, linkOptions);
//-- Create and link the consumer(s) to the throttle block
var consumerAction = (ConsumerAction != null) ? ConsumerAction : new Action<T>(ConsumeItem);
_consumers = new List<Task>();
for (int i = 0; i < MaxConsumers; i++)
{
var consumer = new ActionBlock<T>(consumerAction, consumerOptions);
_throttleBlock.LinkTo(consumer, linkOptions);
_consumers.Add(consumer.Completion);
}
//-- TODO: Add some cancellation tokens to shut this thing down
}
/// <summary>
/// Default Consumer Action, just prints to console
/// </summary>
/// <param name="ItemToConsume"></param>
private void ConsumeItem(T ItemToConsume)
{
Console.WriteLine($"Consumed {ItemToConsume} at {DateTime.UtcNow}");
}
public async Task EnqueueAsync(T ItemToEnqueue)
{
await this._queue.SendAsync(ItemToEnqueue);
}
public async Task EnqueueItemsAsync(IEnumerable<T> ItemsToEnqueue)
{
foreach (var item in ItemsToEnqueue)
{
await this._queue.SendAsync(item);
}
}
public async Task CompleteAsync()
{
this._queue.Complete();
await Task.WhenAll(_consumers);
Console.WriteLine($"All consumers completed {DateTime.UtcNow}");
}
}
测试方法
public class WorkItem<T>
{
public TaskCompletionSource<T> tcs;
//public T respone;
public string url;
public WorkItem(string Url)
{
tcs = new TaskCompletionSource<T>();
url = Url;
}
public override string ToString()
{
return $"{url}";
}
}
public static void TestQueue()
{
Console.WriteLine("Created the queue");
var defaultAction = new Action<WorkItem<String>>(async i => {
var taskItem = ((WorkItem<String>)i);
Console.WriteLine($"Consuming: {taskItem.url} {DateTime.UtcNow:mm:ss:ff}");
//-- Assume calling another async method e.g. await httpClient.DownloadStringTaskAsync(url);
await Task.Delay(5000);
taskItem.tcs.SetResult($"{taskItem.url}");
//Console.WriteLine($"Consumed: {taskItem.url} {DateTime.UtcNow}");
});
var queue = new ThrottledProducerConsumer<WorkItem<String>>(TimeSpan.FromMilliseconds(2000), 5, 2, defaultAction);
var results = new List<Task>();
foreach (var no in Enumerable.Range(0, 20))
{
var workItem = new WorkItem<String>($"http://someurl{no}.com");
results.Add(queue.EnqueueAsync(workItem));
results.Add(workItem.tcs.Task);
results.Add(workItem.tcs.Task.ContinueWith(response =>
{
Console.WriteLine($"Received: {response.Result} {DateTime.UtcNow:mm:ss:ff}");
}));
}
Task.WhenAll(results).Wait();
Console.WriteLine("All Work Items Have Been Processed");
}
最佳答案
自问以来,我创建了一个基于 TPL 数据流的 ThrottledConsumerProducer 类。它经过了数天的测试,其中包括按顺序排队和完成的并发生产者,大约 281k 没有任何问题,但是我有一些我没有发现的错误。
这些类是通用的,因此如果其他人需要类似的东西,它可能对他们有用。我没有写取消或错误处理,但我认为我应该将其标记为已回答以继续进行。我很乐意看到一些替代方案和反馈,而不是将我的标记为已接受的答案。感谢阅读。
注意:我从原来的实现中删除了 Timer,因为它做了奇怪的事情导致信号量释放超过最大值,我假设它是动态上下文错误,它发生在我开始时运行并发请求。我使用 Task.Delay 来安排释放信号量锁来解决这个问题。
节流生产者消费者
public class ThrottledProducerConsumer<T>
{
private BufferBlock<T> _queue;
private IPropagatorBlock<T, T> _throttleBlock;
private List<Task> _consumers;
private static IPropagatorBlock<T1, T1> CreateThrottleBlock<T1>(TimeSpan Interval,
Int32 MaxPerInterval, Int32 BlockBoundedMax = 2, Int32 BlockMaxDegreeOfParallelism = 2)
{
SemaphoreSlim _sem = new SemaphoreSlim(MaxPerInterval, MaxPerInterval);
return new TransformBlock<T1, T1>(async (x) =>
{
//Log($"Transform blk: {x} {DateTime.UtcNow:mm:ss:ff} Semaphore Count: {_sem.CurrentCount}");
var sw = new Stopwatch();
sw.Start();
//Console.WriteLine($"Current count: {_sem.CurrentCount}");
await _sem.WaitAsync();
sw.Stop();
var delayTask = Task.Delay(Interval).ContinueWith((t) =>
{
//Log($"Pre-RELEASE: {x} {DateTime.UtcNow:mm:ss:ff} Semaphore Count {_sem.CurrentCount}");
_sem.Release();
//Log($"PostRELEASE: {x} {DateTime.UtcNow:mm:ss:ff} Semaphoere Count {_sem.CurrentCount}");
});
//},TaskScheduler.FromCurrentSynchronizationContext());
//Log($"Transformed: {x} in queue {sw.ElapsedMilliseconds}ms. {DateTime.Now:mm:ss:ff} will release {DateTime.Now.Add(Interval):mm:ss:ff} Semaphoere Count {_sem.CurrentCount}");
return x;
},
//-- Might be better to keep Bounded Capacity in sync with the semaphore
new ExecutionDataflowBlockOptions { BoundedCapacity = BlockBoundedMax,
MaxDegreeOfParallelism = BlockMaxDegreeOfParallelism });
}
public ThrottledProducerConsumer(TimeSpan Interval, int MaxPerInterval,
Int32 QueueBoundedMax = 5, Action<T> ConsumerAction = null, Int32 MaxConsumers = 1,
Int32 MaxThrottleBuffer = 20, Int32 MaxDegreeOfParallelism = 10)
{
//-- Probably best to link MaxPerInterval and MaxThrottleBuffer
// and MaxConsumers with MaxDegreeOfParallelism
var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1, };
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true, };
//-- Create the Queue
_queue = new BufferBlock<T>(new DataflowBlockOptions { BoundedCapacity = QueueBoundedMax, });
//-- Create and link the throttle block
_throttleBlock = CreateThrottleBlock<T>(Interval, MaxPerInterval);
_queue.LinkTo(_throttleBlock, linkOptions);
//-- Create and link the consumer(s) to the throttle block
var consumerAction = (ConsumerAction != null) ? ConsumerAction : new Action<T>(ConsumeItem);
_consumers = new List<Task>();
for (int i = 0; i < MaxConsumers; i++)
{
var consumer = new ActionBlock<T>(consumerAction, consumerOptions);
_throttleBlock.LinkTo(consumer, linkOptions);
_consumers.Add(consumer.Completion);
}
//-- TODO: Add some cancellation tokens to shut this thing down
}
/// <summary>
/// Default Consumer Action, just prints to console
/// </summary>
/// <param name="ItemToConsume"></param>
private void ConsumeItem(T ItemToConsume)
{
Log($"Consumed {ItemToConsume} at {DateTime.UtcNow}");
}
public async Task EnqueueAsync(T ItemToEnqueue)
{
await this._queue.SendAsync(ItemToEnqueue);
}
public async Task EnqueueItemsAsync(IEnumerable<T> ItemsToEnqueue)
{
foreach (var item in ItemsToEnqueue)
{
await this._queue.SendAsync(item);
}
}
public async Task CompleteAsync()
{
this._queue.Complete();
await Task.WhenAll(_consumers);
Console.WriteLine($"All consumers completed {DateTime.UtcNow}");
}
private static void Log(String messageToLog)
{
System.Diagnostics.Trace.WriteLine(messageToLog);
Console.WriteLine(messageToLog);
}
}
- 用法示例-
通用工作项
public class WorkItem<Toutput,Tinput>
{
private TaskCompletionSource<Toutput> _tcs;
public Task<Toutput> Task { get { return _tcs.Task; } }
public Tinput InputData { get; private set; }
public Toutput OutputData { get; private set; }
public WorkItem(Tinput inputData)
{
_tcs = new TaskCompletionSource<Toutput>();
InputData = inputData;
}
public void Complete(Toutput result)
{
_tcs.SetResult(result);
}
public void Failed(Exception ex)
{
_tcs.SetException(ex);
}
public override string ToString()
{
return InputData.ToString();
}
}
创建在管道中执行的操作 block
private Action<WorkItem<Location,PointToLocation>> CreateProcessingAction()
{
return new Action<WorkItem<Location,PointToLocation>>(async i => {
var sw = new Stopwatch();
sw.Start();
var taskItem = ((WorkItem<Location,PointToLocation>)i);
var inputData = taskItem.InputData;
//Log($"Consuming: {inputData.Latitude},{inputData.Longitude} {DateTime.UtcNow:mm:ss:ff}");
//-- Assume calling another async method e.g. await httpClient.DownloadStringTaskAsync(url);
await Task.Delay(500);
sw.Stop();
Location outData = new Location()
{
Latitude = inputData.Latitude,
Longitude = inputData.Longitude,
StreetAddress = $"Consumed: {inputData.Latitude},{inputData.Longitude} Duration(ms): {sw.ElapsedMilliseconds}"
};
taskItem.Complete(outData);
//Console.WriteLine($"Consumed: {taskItem.url} {DateTime.UtcNow}");
});
}
测试方法您需要为 PointToLocation 和 Location 提供自己的实现。只是一个示例,说明如何将它用于您自己的类。
int startRange = 0;
int nextRange = 1000;
ThrottledProducerConsumer<WorkItem<Location,PointToLocation>> tpc;
private void cmdTestPipeline_Click(object sender, EventArgs e)
{
Log($"Pipeline test started {DateTime.Now:HH:mm:ss:ff}");
if(tpc == null)
{
tpc = new ThrottledProducerConsumer<WorkItem<Location, PointToLocation>>(
//1010, 2, 20000,
TimeSpan.FromMilliseconds(1010), 45, 100000,
CreateProcessingAction(),
2,45,10);
}
var workItems = new List<WorkItem<Models.Location, PointToLocation>>();
foreach (var i in Enumerable.Range(startRange, nextRange))
{
var ptToLoc = new PointToLocation() { Latitude = i + 101, Longitude = i + 100 };
var wrkItem = new WorkItem<Location, PointToLocation>(ptToLoc);
workItems.Add(wrkItem);
wrkItem.Task.ContinueWith(t =>
{
var loc = t.Result;
string line = $"[Simulated:{DateTime.Now:HH:mm:ss:ff}] - {loc.StreetAddress}";
//txtResponse.Text = String.Concat(txtResponse.Text, line, System.Environment.NewLine);
//var lines = txtResponse.Text.Split(new string[] { System.Environment.NewLine},
// StringSplitOptions.RemoveEmptyEntries).LongCount();
//lblLines.Text = lines.ToString();
//Log(line);
});
//}, TaskScheduler.FromCurrentSynchronizationContext());
}
startRange += nextRange;
tpc.EnqueueItemsAsync(workItems);
Log($"Pipeline test completed {DateTime.Now:HH:mm:ss:ff}");
}
关于c# - 具有节流持续时间和批量消费的异步生产者/消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39865282/
这是代码片段。 请说出这种用小内存存储大数据的算法是什么。 public static void main(String[] args) { long longValue = 21474836
所以我使用 imap 从 gmail 和 outlook 接收电子邮件。 Gmail 像这样编码 =?UTF-8?B?UmU6IM69zq3OvyDOtc68zrHOuc67IG5ldyBlbWFpb
很久以前就学会了 C 代码;想用 Scheme 尝试一些新的和不同的东西。我正在尝试制作一个接受两个参数并返回两者中较大者的过程,例如 (define (larger x y) (if (> x
Azure 恢复服务保管库有两个备份配置选项 - LRS 与 GRS 这是一个有关 Azure 恢复服务保管库的问题。 当其驻留区域发生故障时,如何处理启用异地冗余的恢复服务保管库?如果未为恢复服务启
说,我有以下实体: @Entity public class A { @Id @GeneratedValue private Long id; @Embedded private
我有下一个问题。 我有下一个标准: criteria.add(Restrictions.in("entity.otherEntity", getOtherEntitiesList())); 如果我的
如果这是任何类型的重复,我会提前申请,但我找不到任何可以解决我的具体问题的内容。 这是我的程序: import java.util.Random; public class CarnivalGame{
我目前正在使用golang创建一个聚合管道,在其中使用“$ or”运算符查询文档。 结果是一堆需要分组的未分组文档,这样我就可以进入下一阶段,找到两个数据集之间的交集。 然后将其用于在单独的集合中进行
是否可以在正则表达式中创建 OR 条件。 我正在尝试查找包含此类模式的文件名列表的匹配项 第一个案例 xxxxx-hello.file 或者案例二 xxxx-hello-unasigned.file
该程序只是在用户输入行数时创建菱形的形状,因此它有 6 个 for 循环; 3 个循环创建第一个三角形,3 个循环创建另一个三角形,通过这 2 个三角形和 6 个循环,我们得到了一个菱形,这是整个程序
我有一个像这样的查询字符串 www.google.com?Department=Education & Finance&Department=Health 我有这些 li 标签,它们的查询字符串是这样
我有一个带有静态构造函数的类,我用它来读取 app.config 值。如何使用不同的配置值对类进行单元测试。我正在考虑在不同的应用程序域中运行每个测试,这样我就可以为每个测试执行静态构造函数 - 但我
我正在寻找一个可以容纳多个键的容器,如果我为其中一个键值输入保留值(例如 0),它会被视为“或”搜索。 map, int > myContainer; myContainer.insert(make_
我正在为 Web 应用程序创建数据库,并正在寻找一些建议来对可能具有多种类型的单个实体进行建模,每种类型具有不同的属性。 作为示例,假设我想为“数据源”对象创建一个关系模型。所有数据源都会有一些共享属
(1) =>CREATE TABLE T1(id BIGSERIAL PRIMARY KEY, name TEXT); CREATE TABLE (2) =>INSERT INTO T1 (name)
我不确定在使用别名时如何解决不明确的列引用。 假设有两个表,a 和 b,它们都有一个 name 列。如果我加入这两个表并为结果添加别名,我不知道如何为这两个表引用 name 列。我已经尝试了一些变体,
我的查询是: select * from table where id IN (1,5,4,3,2) 我想要的与这个顺序完全相同,不是从1...5,而是从1,5,4,3,2。我怎样才能做到这一点? 最
我正在使用 C# 代码执行动态生成的 MySQL 查询。抛出异常: CREATE TABLE dump ("@employee_OID" VARCHAR(50)); "{"You have an er
我有日期 2016-03-30T23:59:59.000000+0000。我可以知道它的格式是什么吗?因为如果我使用 yyyy-MM-dd'T'HH:mm:ss.SSS,它会抛出异常 最佳答案 Sim
我有一个示例模式,它的 SQL Fiddle 如下: http://sqlfiddle.com/#!2/6816b/2 这个 fiddle 只是根据 where 子句中的条件查询示例数据库,如下所示:
我是一名优秀的程序员,十分优秀!