- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
在我们的系统中,我们有多个数据生成器在共享文件系统中创建文件内容,并在文件名中指示 DataSourceId
。需要有一个公平的调度机制来读取所有源生成的文件,解析、扁平化和丰富(使用引用数据)文件中的数据记录,批处理丰富的记录并写入数据库。
我使用IPartitionedTridentSpout
。拓扑如下所示:
TransactionalTridentEsrSpout spout
= new TransactionalTridentEsrSpout(NUM_OF_PARTITIONS);
TridentTopology topology = new TridentTopology();
topology.newStream("FileHandlerSpout", spout)
.each(new Fields("filename", "esr"), new Utils.PrintFilter())
.parallelismHint(NUM_OF_PARTITIONS)
.shuffle()
.each(new Fields("filename", "record"), new RecordFlattenerAndEnricher(), new elds("record-enriched"))
.each(new Fields("filename", "record-enriched"), new Utils.PrintFilter())
.project(new Fields(record-enriched")) // pass only required
.parallelismHint(PARALLELISM_HINT_FOR_ESR_FLATTENER_ENRICHER)
.shuffle()
.aggregate(new Fields("record-enriched"), new BlockWriterToDb(), new Fields("something"))
.each(new Fields("something"), new Utils.PrintFilter())
.parallelismHint(PARALLELISM_HINT_FOR_GP_WRITER);
由于数据文件很大(通常是100万条记录),所以我小批量读取10K记录。对于 Coordinator
生成的每个 transactionId
,我的 Emitter
会发出其分区中当前/下一个文件的接下来 10K 记录。最终的 BlockWriter 会将丰富的记录聚合到缓冲区中,并在“完成”方法调用时将缓冲区写入数据库。
拓扑工作正常,但我有以下问题:
ParttionedTridentSpout
的 parallelismHint
会影响 Emitters
的数量,它被设置为分区的数量。接下来两层(FlattenerAndEnricher 和 BlockWriterToDb)的 parallelismHint 需要设置为更高的值,因为我们还有很多工作要做那里。由于这里不需要 groupBy
,因此我在所有阶段之间使用 shuffle()
。当特定的下游 Bolt 失效时,Trident 将使用适当的旧元数据调用 Emitter
,要求其重新发射。但由于发生了洗牌,作为一个发射器发射的一部分的特定记录将落在多个下游 bolt 中。那么,Trident 如何调用适当的发射器进行重新发射,以便重新发射完全相同的记录。即使 Trident 调用适当的发射器,发射器也会重新发射整个 10K 批处理,其中一些记录仅失败。 Storm 是如何处理整个序列的,以及我们如何设计这里的应用程序逻辑来处理恰好一次语义的容错。
最佳答案
使用Trident时,整个批处理成功,或者整个批处理失败。当批处理失败时,spout 应该(自动)重播整个批处理,并且您将无法在发出时在其记录中进行挑选。
要获得一次性语义,您的下游逻辑/数据库更新应该忽略重播项目(跟踪成功更新项目的批处理 ID),或者是幂等的。
关于java - Storm 三叉戟 : How to use IPartitionedTridentSpout?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21630993/
在我们的系统中,我们有多个数据生成器在共享文件系统中创建文件内容,并在文件名中指示 DataSourceId。需要有一个公平的调度机制来读取所有源生成的文件,解析、扁平化和丰富(使用引用数据)文件中的
我是一名优秀的程序员,十分优秀!