- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
假设我有一个包含多个分区的主题。我在其中写入 K/V 数据,并希望通过按键在 Tumbling Windows 中聚合所述数据。
假设我已经启动了与我有分区一样多的工作实例,并且每个工作实例都在单独的机器上运行。
我将如何确保生成的聚合包含每个键的所有值? IE 我不希望每个 worker 实例都有一些值的子集。
这是 StateStore 的用途吗? Kafka 是自己管理这个还是我需要想出一个方法?
最佳答案
How would I go about insuring that the resultant aggregations include all values for each key? IE I don't want each worker instance to have some subset of the values.
通常,Kafka Streams 确保同一个键的所有值将由同一个(且只有一个)流任务处理,这也意味着只有一个应用程序实例(您描述为“工作实例”)将处理该键的值。请注意,一个应用实例可能会运行 1 个以上的流任务,但这些任务是隔离的。
这种行为是通过数据的分区实现的,Kafka Streams 确保一个分区总是由同一个流任务处理。键/值的逻辑链接是,在 Kafka 和 Kafka Streams 中,一个键总是被发送到同一个分区(这里有一个陷阱,但我不确定是否有必要详细了解范围这个问题),因此一个特定的分区——在可能的许多分区中——包含同一个键的所有值。
在某些情况下,例如加入两个流 A
和 B
时,您必须确保聚合将对相同的键进行操作,以确保来自两个流的数据流位于同一个流任务中——同样,这都是为了确保相关的输入流分区,从而匹配键(来自 A
和 B
,分别)在同一个流任务中可用。您在这里使用的典型方法是 selectKey()
。一旦完成,Kafka Streams 确保,为了连接两个流 A 和 B 以及创建连接的输出流,相同键的所有值将由相同的流任务处理,从而由相同的应用程序实例处理。
例子:
A
具有键 userId
和值 { georegion }
。B
具有键 georegion
,值为 { continent, description }
。仅当两个流使用相同的 key 时,加入两个流才有效(从 Kafka 0.10.0 开始)。在此示例中,这意味着您必须重新键入(并因此重新分区)流 A
,以便生成的键从 userId
更改为 georegion
。否则,从 Kafka 0.10 开始,您无法连接 A
和 B
,因为数据没有位于负责实际执行连接的流任务中。
在此示例中,您可以通过以下方式对流 A
进行重新加密/重新分区:
// Kafka 0.10.0.x (latest stable release as of Sep 2016)
A.map((userId, georegion) -> KeyValue.pair(georegion, userId)).through("rekeyed-topic")
// Upcoming versions of Kafka (not released yet)
A.map((userId, georegion) -> KeyValue.pair(georegion, userId))
只有在 Kafka 0.10.0 中才需要 through()
调用来实际触发重新分区,而更高版本的 Kafka 会自动为您完成这些(即将推出的功能已经完成并可用)在 Kafka trunk
中)。
Is this something that a StateStore would be used for? Does Kafka manage this on its own or do I need to come up with a method?
一般来说,不会。上述行为是通过分区实现的,而不是通过状态存储。
有时,由于您为流定义的操作而涉及状态存储,这可能解释了您问这个问题的原因。例如,窗口操作需要管理状态,因此将在幕后创建状态存储。但是您的实际问题——“确保生成的聚合包括每个键的所有值”——与状态存储无关,它与分区行为有关。
关于java - Kafka KTable——跨机器共享聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39256483/
我有一个 Cassandra 集群,里面有 4 个表和数据。 我想使用聚合函数(sum,max ...)发出请求,但我在这里读到这是不可能的: http://www.datastax.com/docu
我有以下两张表 Table: items ID | TITLE 249 | One 250 | Two 251 | Three 我投票给这些: Table: votes VID | IID | u
这个问题在这里已经有了答案: Update MongoDB field using value of another field (12 个答案) 关闭 3 年前。 我想根据另一个“源”集合的文档中
我的收藏包含以下文件。我想使用聚合来计算里面有多少客户,但我遇到了一些问题。我可以获得总行数,但不能获得总(唯一)客户。 [{ _id: "n001", channel: "Kalip
我有下表 Id Letter 1001 A 1001 H 1001 H 1001 H 1001 B 1001 H 1001 H 1001
得到一列的表 ABC。 “创建”的日期列。所以样本值就像; created 2009-06-18 13:56:00 2009-06-18 12:56:00 2009-06-17 14:02:0
我有一个带有数组字段的集合: {[ name:String buyPrice:Int sellPrice:Int ]} 我试图找到最低和最高买入/卖出价格。在某些条目中,买入或卖出价格为零
我有以下问题: 在我的 mongo db 中,我有以下结构: { "instanceId": "12", "eventId": "0-1b", "activityType":
下面给出的是我要在其上触发聚合查询的 Elasticsearch 文档。 { "id": 1, "attributes": [ { "fieldId": 1,
我正在使用 Django 的 aggregate query expression总计一些值。最终值是一个除法表达式,有时可能以零作为分母。如果是这种情况,我需要一种方法来逃避,以便它只返回 0。 我
我正在学习核心数据,特别是聚合。 当前我想要做的事情:计算表中在某些条件上具有逆关系的多对关系的记录数。 目前我正在这样做: NSExpression *ex = [NSExpression expr
我需要有关 Delphi 中的 ClientDatasets 的一些帮助。 我想要实现的是一个显示客户的网格,其中一列显示每个客户的订单数量。我将 ClientDataset 放在表单上并从 Delp
我的集合有 10M 个文档,并且有一个名为 movieId 的字段;该文档具有以下结构: { "_id" : ObjectId("589bed43e3d78e89bfd9b779"), "us
这个问题已经有答案了: What is the difference between association, aggregation and composition? (21 个回答) 已关闭 9
我在 elasticsearch 中有一些类似于这些示例的文档: { "id": ">", "list": [ "a", "b", "c" ] } { "id"
我正在做一些聚合。但是结果完全不是我所期望的,似乎它们没有聚合索引中与我的查询匹配的所有文档,在这种情况下 - 它有什么好处? 例如,首先我做这个查询: {"index":"datalayer","t
假设我在 ES 中有这些数据。 | KEY | value | |:-----------|------------:| | A |
可能在我的文档中,我有一个被分析的文本字段。我只是在ElasticSearch AggregationAPI中迷路了。我需要2种不同情况的支持: 情况A)结果是带有计数标记(条款)的篮子下降。 情况B
我正在为网上商店构建多面过滤功能,如下所示: Filter on Brand: [ ] LG (10) [ ] Apple (5) [ ] HTC (3) Filter on OS: [ ] Andr
我有一个父/子关系并且正在搜索 child 。 是否可以在父属性上创建聚合? 例如parent 是 POST,children 是 COMMENT。如果父项具有“类别”属性,是否可以搜索 COMMEN
我是一名优秀的程序员,十分优秀!