- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我想征求意见,了解如何使用 TPL 数据流设计最佳架构。我还没有编写代码,所以没有我可以发布的示例代码。我也不是在寻找代码(除非自愿),但非常感谢设计方面的帮助:
要求如下:
我有 3 个核心数据 block ,它们以特定方式相互依赖。 Datablock1 是生产 Foo1 类型对象的生产者。 Datablock2 应该订阅 Foo1 对象(来自 Datablock1)并且可能(不是在每个 Foo1 上,受特定函数约束)生成 Foo2 对象,它存储在输出队列中供其他数据 block 使用。 Datablock3 还使用 Foo1 对象(来自 Datablock1)并可能生成 Foo3 对象,Datablock2 使用这些对象并将其转换为 Foo2 对象。
总而言之,这里是数据 block 以及它们各自生产和消费的内容:
一个额外的要求是相同的 Foo1 在 Datablock2 和 Datablock3 中几乎同时被处理。如果 Foo1 对象首先被 Datablock2 使用,然后一旦 Datablock2 完成其工作,将完全相同的 Foo1 对象发布到 Datablock3 以使其完成工作,那就没问题了。来自 Datablock2 的 Foo2 对象可以来自对 Foo1 对象或 Foo3 对象的操作。
我希望这是有道理的,如果仍然不清楚,我很乐意解释更多。
我的第一个想法是为 3 个数据 block 中的每一个创建 TPL 数据流 block ,并让它们处理不同对象类型的传入流。另一个想法是拆分数据 block ,让每个数据 block 只处理一种单一对象类型的流。您有什么建议或是否有更好的解决方案?
Svick 已经在 Datablock1 上提供了帮助并且它已经可以运行了,我只是想知道如何将我当前的环境(如上所述)转换为 TPL 数据流。
非常感谢任何想法或指示。
最佳答案
让我们将这个问题分成三个部分并分别解决。
第一个是如何有条件地生产元素。我认为最好的选择是使用 TransformManyBlock
并让您的函数返回一个包含一个或零个项目的集合。
另一种选择是 link the two blocks conditionally ,所以 null
s 被忽略并返回 null
当你不想生产任何东西时。但如果你这样做,你还必须将源链接到 NullTarget
,所以 null
s 不会留在其输出缓冲区中。
第二个问题是如何将 Foo1s 发送到 block #2 和 block #3。我可以在这里看到两种方式:
BroadcastBlock
链接到两个目标 block (#2 和#3)。小心这个,因为BroadcastBlock
没有输出队列,所以如果一个目标 block 推迟了一个项目,就意味着它不会处理它。因此,您不应设置 BoundedCapacity
在这种情况下, block #2 和#3。如果您不这样做,它们将永远不会推迟,并且所有消息都将由两个 block 处理。Post()
(或更好,SendAsync()
)阻止#3。我不确定“大约同时”到底是什么意思,但总的来说,TPL Dataflow 不对独立 block 的处理顺序做出任何保证。您可以使用 a custom TaskScheduler
更改不同 block 的优先级,但我不确定这在这里是否有用。
最后也是最复杂的问题是如何在单个 block 中处理不同类型的项目。有几种方法可以做到这一点,但我不确定哪种方法最适合您:
TransformBlock<Foo1, Foo2>
和一个TransformBlock<Foo3, Foo2>
.然后您可以将它们都链接到一个 BufferBlock<Foo2>
.BatchedJoinBlock<Foo1, Foo3>
, 与 batchSize
的 1。这意味着结果 Tuple<IList<Foo1>, IList<Foo3>>
将始终包含其中一个 Foo1
或一个 Foo3
.BatchedJoinBlock
来增强以前的解决方案到 TransformBlock
产生更合适的类型。那可能是 Tuple<Foo1, Foo3>
(其中一项将始终为 null
),或类似于 F# Choice<Foo1, Foo3>
的内容,这确保只设置两者之一。ISourceBlock<Foo2>
并且还有两个属性:Target1
类型 ITarget<Foo1>
和 Target2
类型 ITarget<Foo3>
,就像内置的连接 block 。使用选项 #1 和 #3,您还可以将 block 封装到单个自定义 block 中,从外部看起来像 #4 中的 block ,因此更容易重用。
关于c# - 最佳 TPL 数据流设计?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11242144/
是否可以插入到初始表,然后使用插入的 ID 插入到主表中,该主表在一个数据流的列之间具有外键约束? 我是集成服务的新手,不知道这些功能 场景: 表 A - ID - DESC 表 B - ID - A
在 Azure 数据流中,在聚合转换中是否可以在分组依据中动态包含列?我在分组依据中可能需要 8 列,具体取决于它们的值,即如果值为 1,则包含在分组依据中。 简化为 2 列: Column1
我想要实现的是在azure数据流中包含错误处理,如果在传输行时发生错误,它不应该失败,它会处理其他行并将发生错误的行的ID保存在文本文件或日志中 示例: 假设我们有 10 行要沉入表中,不知何故我们在
我的数据流作业将源和接收器作为突触数据库。 我在从突触数据库提取数据时有一个源查询,其中包含数据流中的联接和转换。 众所周知,底层的数据流将启动 databricks 集群来执行数据流代码。 我的问题
这是关于非常常见的传感器数据处理问题。 为了同步和合并来自不同来源的传感器数据,我想用 Java 实现它,而不需要太复杂的第三个库或框架。 假设我定义了一个对象 (O),它由 4 个属性 (A1,..
我开始从事一个项目,我需要使用 PowerTrack/GNIP 流式传输 Twitter 数据,老实说,我在网络方面非常非常缺乏经验,而且我完全不了解网络方面的知识到数据流 (HTTP),它们如何工作
我有一个后端要用 Python 实现,它应该将数据流式传输到 JavaScript 正在创建表示的 Web 浏览器(例如,不断更新变量或绘制到 )。 该数据将以高达 100 Hz 的速率更新(最坏情
我构建了一个简单的 MERN 应用程序,用户可以在其中对电话号码进行评分。用户只需填写电话号码,选择评级(1 - 5 星评级)、城市和短文本。该应用程序具有带过滤和排序选项的搜索功能。这一切都足够好
我在 TPL 数据流上使用顺序管道构建,它由 3 个块组成: B1 - 准备消息 B2 - 将消息发布到远程服务 B3 - 保存结果 问题是如何在发生服务关闭等错误时关闭管道。管道必须以受控方式关闭,
我在 ADF 数据流中有一个数据集(ADLS Gen2 中存在的 csv 文件)。我第一次尝试进行数据预览时,原始文件中的所有列都正确显示。然后,我从 csv 文件中删除了第一列并刷新了“数据预览”选
我正在使用 ADF v2 DataFlow ativity 将数据从 Blob 存储中的 csv 文件加载到 Azure SQL 数据库中的表中。在数据流(源 - Blob 存储)中,在源选项中,有一
我有很多带有嵌套列表的 json 文件需要展平。问题是它们是不同的,我不想为它们每一个创建一个分支。如何通过输入参数动态执行具有“展开依据”和“输入列”字段的展平事件? 谢谢! 最佳答案 对于展开方式
我一直在尝试使用 Azure 数据工厂的数据流在文件的小数列中进行数据类型检查,但它没有按预期工作。我的问题如下: 我想检查数字 121012132.12 是否为小数,因此我使用数据流的派生列并编写表
我们使用 Azure 数据流在 Azure SQL 数据仓库中生成数据表的历史记录。在数据流中,我们在所有列上使用 md5 或 sha1 函数来生成唯一的行指纹来检测记录中的更改,或识别已删除/新记录
我们使用 Azure 数据流在 Azure SQL 数据仓库中生成数据表的历史记录。在数据流中,我们在所有列上使用 md5 或 sha1 函数来生成唯一的行指纹来检测记录中的更改,或识别已删除/新记录
我之前使用 bz2 来尝试解压缩输入。我想要解码的输入已经是压缩格式,因此我决定将格式输入到交互式 Python 控制台中: >>> import bz2 >>> bz2.decompress(inp
在测试 WPF 项目中,我尝试使用 TPL 数据流来枚举给定父目录的所有子目录,并创建具有特定文件扩展名的文件列表,例如“.xlsx”。我使用 2 个 block ,第一个 dirToFilesBlo
问题:为什么使用 WriteOnceBlock (或 BufferBlock )用于从另一个 BufferBlock 取回答案(类似回调) (取回答案发生在发布的 Action 中)导致死锁(在此代码
此代码永远不会到达最后一行,因为完成不会从 saveBlock 传播到 sendBlock。我做错了什么? var readGenerateBlock = new TransformBlock(n =
好吧,我知道我的问题需要更多的指导,而不是技术细节,但我希望 SO 成员不会介意 TPL 数据流的新手提出一些非常基础的问题。 我有一个简单的 Windows 窗体应用程序,它负责从我系统上的 Exc
我是一名优秀的程序员,十分优秀!