- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我在 StackOverflow 上阅读了很多关于使用 TaskCompletionSource
包装基于回调的 API 和基于 Task
的 API 的文章和问题,我正在尝试在与 Solace PubSub+ 消息代理通信时使用这种技术。
我最初的观察是这种技术似乎转移了并发的责任。例如,Solace 代理库有一个 Send()
方法,它可能会阻塞,然后我们在网络通信完成后得到一个回调,以指示“真正的”成功或失败。所以这个Send()
方法可以被非常快速的调用,并且vendor库在内部限制了并发。
当你围绕它放置一个任务时,你似乎要么序列化操作(foreach message await SendWrapperAsync(message)
),要么通过决定启动多少任务来自己接管并发责任(例如,使用 TPL 数据流)。
无论如何,我决定用保证器包装 Send
调用,保证器将永远重试,直到回调指示成功,并负责并发。这是一个“有保证”的消息传递系统。失败不是一种选择。这要求担保人可以施加背压,但这不在这个问题的范围内。我在下面的示例代码中对此有一些评论。
它的意思是我的热路径(包含发送 + 回调)由于重试逻辑而“特别热”。因此这里有很多 TaskCompletionSource
创建。
供应商自己的文档建议尽可能重用他们的 Message
对象,而不是为每个 Send
重新创建它们。为此,我决定使用 Channel
作为环形缓冲区。但这让我想知道 - 是否有一些替代 TaskCompletionSource
方法的方法 - 也许其他一些对象也可以缓存在环形缓冲区中并重复使用,从而实现相同的结果?
我意识到这可能是对微优化的过分热心尝试,老实说,我正在探索 C# 的几个方面,这些方面超出了我的薪水等级(我是 SQL 专家,真的),所以我可能会遗漏一些东西明显的。如果答案是“你实际上不需要这种优化”,那我就不会放心了。如果答案是“这确实是唯一明智的方法”,我的好奇心就可以得到满足。
这是一个功能齐全的控制台应用程序,它模拟了 MockBroker
对象中 Solace 库的行为,以及我对它进行包装的尝试。我的热路径是 Guarantor
类中的 SendOneAsync
方法。代码对于 SO 来说可能有点太长,但它是我可以创建的最小演示,它捕获了所有重要元素。
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
internal class Message { public bool sent; public int payload; public object correlator; }
// simulate third party library behaviour
internal class MockBroker
{
public bool TrySend(Message m, Action<Message> callback)
{
if (r.NextDouble() < 0.5) return false; // simulate chance of immediate failure / "would block" response
Task.Run(() => { Thread.Sleep(100); m.sent = r.NextDouble() < 0.5; callback(m); }); // simulate network call
return true;
}
private Random r = new();
}
// Turns MockBroker into a "guaranteed" sender with an async concurrency limit
internal class Guarantor
{
public Guarantor(int maxConcurrency)
{
_broker = new MockBroker();
// avoid message allocations in SendOneAsync
_ringBuffer = Channel.CreateBounded<Message>(maxConcurrency);
for (int i = 0; i < maxConcurrency; i++) _ringBuffer.Writer.TryWrite(new Message());
}
// real code pushing into a T.T.T.DataFlow block with bounded capacity and parallelism
// execution options both equal to maxConcurrency here, providing concurrency and backpressure
public async Task Post(int payload) => await SendOneAsync(payload);
private async Task SendOneAsync(int payload)
{
Message msg = await _ringBuffer.Reader.ReadAsync();
msg.payload = payload;
// send must eventually succeed
while (true)
{
// *** can this allocation be avoided? ***
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
msg.correlator = tcs;
// class method in real code, inlined here to make the logic more apparent
Action<Message> callback = (msg) => (msg.correlator as TaskCompletionSource<bool>).SetResult(msg.sent);
if (_broker.TrySend(msg, callback) && await tcs.Task) break;
else
{
// simple demo retry logic
Console.WriteLine($"retrying {msg.payload}");
await Task.Delay(500);
}
}
// real code raising an event here to indicate successful delivery
await _ringBuffer.Writer.WriteAsync(msg);
Console.WriteLine(payload);
}
private Channel<Message> _ringBuffer;
private MockBroker _broker;
}
internal class Program
{
private static async Task Main(string[] args)
{
// at most 10 concurrent sends
Guarantor g = new(10);
// hacky simulation since in this demo there's nothing generating continuous events,
// no DataFlowBlock providing concurrency (it will be limited by the Channel instead),
// and nobody to notify when messages are successfully sent
List<Task> sends = new(100);
for (int i = 0; i < 100; i++) sends.Add(g.Post(i));
await Task.WhenAll(sends);
}
}
最佳答案
是的,你可以避免分配TaskCompletionSource
实例,通过使用轻量级 ValueTask
s 而不是 Task
秒。首先,您需要一个可以实现 IValueTaskSource<T>
的可重用对象接口(interface),以及 Message
似乎是完美的候选人。要实现此接口(interface),您可以使用 ManualResetValueTaskSourceCore<T>
结构。这是一个可变结构,因此不应将其声明为readonly
。
。您只需要将接口(interface)方法委托(delegate)给这个名称很长的结构的相应方法:
using System.Threading.Tasks.Sources;
internal class Message : IValueTaskSource<bool>
{
public bool sent; public int payload; public object correlator;
private ManualResetValueTaskSourceCore<bool> _source; // Mutable struct, not readonly
public void Reset() => _source.Reset();
public short Version => _source.Version;
public void SetResult(bool result) => _source.SetResult(result);
ValueTaskSourceStatus IValueTaskSource<bool>.GetStatus(short token)
=> _source.GetStatus(token);
void IValueTaskSource<bool>.OnCompleted(Action<object> continuation,
object state, short token, ValueTaskSourceOnCompletedFlags flags)
=> _source.OnCompleted(continuation, state, token, flags);
bool IValueTaskSource<bool>.GetResult(short token) => _source.GetResult(token);
}
三个成员GetStatus
, OnCompleted
和 GetResult
是实现接口(interface)所必需的。其他三个成员( Reset
、 Version
和 SetResult
)将用于创建和控制 ValueTask<bool>
现在让我们包装 TrySend
MockBroker
的方法异步方法中的类 TrySendAsync
, 返回 ValueTask<bool>
static class MockBrokerExtensions
{
public static ValueTask<bool> TrySendAsync(this MockBroker source, Message message)
{
message.Reset();
bool result = source.TrySend(message, m => m.SetResult(m.sent));
if (!result) message.SetResult(false);
return new ValueTask<bool>(message, message.Version);
}
}
message.Reset();
重置 IValueTaskSource<bool>
, 并声明前面的异步操作已经完成。 IValueTaskSource<T>
一次只支持一个异步操作,生成的 ValueTask<T>
只能等待一次,在下一次Reset()
之后就不能再等待了.这就是您为避免分配对象而必须付出的代价:您必须遵循更严格的规则。如果你试图违反规则(有意或无意),ManualResetValueTaskSourceCore<T>
将开始 throw InvalidOperationException
到处都是。
现在让我们使用 TrySendAsync
扩展方法:
while (true)
{
if (await _broker.TrySendAsync(msg)) break;
// simple demo retry logic
Console.WriteLine($"retrying {msg.payload}");
await Task.Delay(500);
}
您可以在 Console
中打印GC.GetTotalAllocatedBytes(true)
整个操作前后,看区别。确保在 Release模式下运行应用程序,以查看真实情况。您可能会发现差异并不那么令人印象深刻,因为 TaskCompletionSource
的大小与 Task.Delay
分配的字节相比,实例非常小,以及所有 string
s 是为在 Console
中编写内容而生成的.
关于c# - 在热路径上使用异步 API 包装基于回调的 API 时避免分配并保持并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69147931/
我有一个应用程序,它会抛出 GKSession 并在各种条件下(连接超时、 session 失败等)创建一个新的 GKSession。不过,我遇到了内存泄漏问题,并且有时会在重新连接几次循环后崩溃。
比如我在宿主代码中有一个浮点指针 float *p 是否可以确定他指向的内存类型(设备/主机)? 最佳答案 在 UVA system 中, 运行时 API 函数 cudaPointerGetAttri
我已将项目转换为 .Net 4.0 并且以下代码不起作用: typeof(RuntimeTypeHandle).GetMethod("Allocate", BindingFlags.Instance
当我声明 char ch = 'ab' 时,ch 只包含 'b',为什么它不存储 'a'? #include int main() { char ch = 'ab'; printf("%c"
我对 Disk Sector 和 Block 有疑问。扇区是一个单位,通常为 512 字节或 1k、2k、4k 等取决于硬件。文件系统 block 大小是一组扇区大小。 假设我正在存储一个 5KB 的
假设我有 8 个人和5000 个苹果。 我想将所有苹果分发给所有 8 个人,这样我就没有苹果了。 但每个人都应该得到不同数量 将它们全部分发出去的最佳方式是什么? 我是这样开始的: let peopl
我正在构建的网站顶部有一个搜索栏。与 Trello 或 Gmail 类似,我希望当用户按下“/”键时,他们的焦点就会转到该搜索框。 我的 JavaScript 看起来像这样: document.onk
我有一小段代码: if (PZ_APP.dom.isAnyDomElement($textInputs)){ $textInputs.on("focus", function(){
我观察到以下行为。 接受了两个属性变量。 @property (nonatomic, retain) NSString *stringOne; @property (nonatomic, assign
我正在解决这样的问题 - 实现一个计算由以下内容组成的表达式的函数以下操作数:“(”、“)”、“+”、“-”、“*”、“/”。中的每个数字表达式可能很大(与由字符串表示的一样大)1000 位)。 “/
我有一组主机和一组任务。 每个主机都有 cpu、mem 和任务容量,每个任务都有 cpu、mem 要求。 每个主机都属于一个延迟类别,并且可以与具有特定延迟值的其他主机通信。 每个任务可能需要以等于或
该程序的作用:从文件中读取一个包含 nrRows 行和 nrColomns 列的矩阵(二维数组)。矩阵的所有元素都是 [0,100) 之间的整数。程序必须重新排列矩阵内的所有元素,使每个元素等于其所在
世界!我有个问题。今天我尝试创建一个代码,它可以找到加泰罗尼亚语号码。但是在我的程序中可以是长数字。我找到了分子和分母。但我不能分割长数字!此外,只有标准库必须在此程序中使用。请帮帮我。这是我的代码
我确定我遗漏了一些明显的东西,但我想在 Objective C 中创建一个 NSInteger 指针的实例。 -(NSInteger*) getIntegerPointer{ NSInteger
这个问题在这里已经有了答案: Difference between self.ivar and ivar? (4 个答案) 关闭 9 年前。
我如何将 v[i] 分配给一系列整数(v 的类型是 vector )而无需最初填充 最佳答案 你的意思是将 std::vector 初始化为一系列整数? int i[] = {1, 2, 3, 4,
我想寻求分配方面的帮助....我把这个作业带到了学校......我必须编写程序来加载一个 G 矩阵和第二个 G 矩阵,并搜索第二个 G 矩阵以获取存在数第一个 G 矩阵的......但是,当我尝试运行
我必须管理资源。它基本上是一个唯一的编号,用于标识交换机中的第 2 层连接。可以有 16k 个这样的连接,因此每次用户希望配置连接时,他/她都需要分配一个唯一索引。同样,当用户希望删除连接时,资源(号
是否有任何通用的命名约定来区分已分配和未分配的字符串?我正在寻找的是希望类似于 us/s 来自 Making Wrong Code Look Wrong ,但我宁愿使用常见的东西也不愿自己动手。 最佳
我需要读取一个 .txt 文件并将文件中的每个单词分配到一个结构中,该结构从结构 vector 指向。我将在下面更好地解释。 感谢您的帮助。 我的程序只分配文件的第一个字... 我知道问题出在函数 i
我是一名优秀的程序员,十分优秀!