- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
使用 KTable 时,当实例/消费者数量等于分区数量时,Kafka 流不允许实例从特定主题的多个分区中读取数据。我尝试使用 GlobalKTable 来实现这一点,这样做的问题是数据将被覆盖,而且聚合也无法应用于它。
假设我有一个名为“data_in”的主题,有 3 个分区(P1、P2、P3)。当我运行 Kafka 流应用程序的 3 个实例(I1、I2、I3)时,我希望每个实例从“data_in”的所有分区读取数据。我的意思是,I1 可以从 P1、P2 和 P3 读取,I2 可以从 P1、P2 和 P3 读取,I2 等等。
编辑:请记住,生产者可以将两个相似的 ID 发布到“data_in”中的两个不同分区中。因此,当运行两个不同的实例时,GlobalKtable将被覆盖。
请问如何实现这一点?这是我的代码的一部分
private KTable<String, theDataList> globalStream() {
// KStream of records from data-in topic using String and theDataSerde deserializers
KStream<String, Data> trashStream = getBuilder().stream("data_in",Consumed.with(Serdes.String(), SerDes.theDataSerde));
// Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
KGroupedStream<String, Data> KGS = trashStream.groupByKey();
Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);
// Return a KTable
return KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
if (!value.getValideData())
aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
else
aggregate.getList().add(value);
return aggregate;
}, materialized);
}
最佳答案
将输入主题“data_in”的分区数更改为 1 个分区,或者使用 GlobalKtable
从主题中的所有分区获取数据,然后您可以将其加入流中。这样,您的应用程序实例不再需要位于不同的消费者组中。
代码如下所示:
private GlobalKTable<String, theDataList> globalStream() {
// KStream of records from data-in topic using String and theDataSerde deserializers
KStream<String, Data> trashStream = getBuilder().stream("data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));
thrashStream.to("new_data_in"); // by sending to an other topic you're forcing a repartition on that topic
KStream<String, Data> newTrashStream = getBuilder().stream("new_data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));
// Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
KGroupedStream<String, Data> KGS = newTrashStream.groupByKey();
Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);
// Return a KTable
KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
if (!value.getValideData())
aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
else
aggregate.getList().add(value);
return aggregate;
}, materialized)
.to("agg_data_in");
return getBuilder().globalTable("agg_data_in");
}
编辑:我编辑了上面的代码以强制对名为“new_data_in”的主题进行重新分区。
关于java - 卡夫卡流: Read from ALL partitions in every instance of an application,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53719700/
我有什么: 简单的服务器,配备一个具有 8 个逻辑内核的至强处理器、16 GB 内存、2 个 7200rpm 驱动器的 mdadm raid1。 PostgreSQL 需要处理大量数据。每天导入多达
当消息排入分区队列时,服务总线会检查分区键是否存在。如果找到,它将根据分区键选择片段。 但是当该片段已满时会发生什么,该片段中没有剩余空间。服务总线是否给出错误/消息被丢弃或任何其他片段将用于存储该消
我想知道将集合拆分为子集的有效方法是什么? Iterable> partitions = Iterables.partition(numbers, 10); 或 List> partitions =
有人可以告诉我 DATETIME 列上 HASH PARITION 与 RANGE PARTITION 的优缺点吗?假设我们有一个包含 2000 万条记录的 POS 表,并且想要根据交易日期的年份创建
我们有一个 cosmos-db 容器,其中包含大约 1M 条记录,其中包含有关客户的信息。 documentDb 的分区键是 customerId,它保存客户的唯一 GUID 引用。我已阅读parti
因此,我在写入数据库的步骤中有 2 个分区。我想记录每个分区写入的行数,得到总和,打印到日志中; 我正在考虑在编写器中使用static变量,并使用Step Context/Job Context在St
Bigquery 文档说可以更新分区表的分区时间到期。而我只能为摄取时间分区表执行此操作。我尝试了以下方法: bq query --use_legacy_sql=false ' CREATE
这是一个两部分的问题: 1) 是否可以根据数据的 ROWID 或其他标识符使用 select 语句检索数据所在分区的名称? 例如。 SELECT DATA_ID, CATEGORY, VALUE, *
注意:这几乎是 this question 的副本区别在于,在这种情况下,源表是日期分区的,而目标表尚不存在。此外,该问题的公认解决方案在这种情况下不起作用。 我试图将一天的数据从一个日期分区表复制到
我已经搜索了很多,但找不到有关以下场景的任何信息。 考虑一个包含超过 500,000 行、约 20 列和约 5 列上的 INDEX 的 InnoDB 表。 当该表处于以下情况时,执行“ALTER TA
如何将分区表(在 Oracle 10g 数据库中)更改为不仅用于分区而且还用于表本身的新表空间?我的意思是,我可以毫无问题地进行以下操作, --sql 改变表 abc 移动分区 abc01 表空间 n
我们正在尝试基于 BigQuery 在云中构建(或者更好地说重建)我们的 DWH。我们决定对原始数据使用“按日期字段分区”表(如“created_date”字段),而不是摄取时间分区,因为通过此功能,
给定一个使用分区的 Spring Batch 作业:
“每个分区中可以有许多键(及其相关值),但任何给定键的记录都在一个分区中。”这是一本著名的hadoop教科书的一行。我没有理解它的第二部分的全部含义,即“但是任何给定键的记录都在一个分区中。”这是否意
Let's say I have an Athena table mytable partitioned by columns A, B, and C.假设我有一个由列A、B和C分区的Athen
我正在寻找一些文档来了解 hive.exec.max.dynamic.partitions 和 hive.exec.max.dynamic.partitions.pernode 之间的区别。 我们什么
我看过一些关于创建分区的表的很好的解释,这些分区是 CLUSTERED BY 和 SORTED BY。这与创建带分区的表,然后使用 CLUSTER BY 填充表(例如使用 INSERT OVERWRI
使用摄取时间分区表,可以免费查询每个分区的行数。字节计费为 0。 SELECT DATE(_PARTITIONTME) AS dd, COUNT(*) FROM ds.ingestion_time_p
此处提供示例项目: https://github.com/codependent/micronaut-aws-lambda-proxy-graal 我在 Amazon AWS 上部署了一个 Micro
是否可以在不指定分区键的情况下通过其 ID 检索文档? 我的理解来自阅读 documentation是当未指定分区键时查询将在所有分区中扇出: The following query does not
我是一名优秀的程序员,十分优秀!