- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我正在使用 KafkaConsumer 0.10 Java api。我想从特定分区和特定偏移量中使用。我查了一下,发现有一个seek方法,但是它抛出了一个异常。有人有类似的用例或解决方案吗?
代码:
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.seek(new TopicPartition("mytopic", 1), 4);
异常
java.lang.IllegalStateException: No current assignment for partition mytopic-1
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
at xx.xxx.xxx.Test.main(Test.java:182)
最佳答案
在您可以seek()
之前,您首先需要subscribe()
一个主题或 assign()
将主题划分给消费者。还要记住,subscribe()
和 assign()
是惰性的——因此,您还需要对 poll() 进行“虚拟调用”
,然后才能使用 seek()
。
Note: as of Kafka 2.0, the new
poll(Duration timeout)
is async and it's not guaranteed that you have a complete assignment whenpoll
returns. Thus, you might need to check your assignment before usingseek()
and alsopoll
again to refresh the assignment. (Cf. KIP-266 for details)
如果你使用subscribe()
,你使用组管理:因此,你可以使用相同的group.id
启动多个消费者,并且主题的所有分区都将被自动平均分配给组内的所有消费者(每个分区将分配给组中的单个消费者)。
如果要读取特定分区,需要通过assign()
手动赋值。这使您可以执行任何您想要的任务。
顺便说一句:KafkaConsumer
有一个非常长的详细类 JavaDoc,包括示例。值得一读。
关于java - KafkaConsumer 0.10 Java API 错误信息 : No current assignment for partition,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41008610/
我有什么: 简单的服务器,配备一个具有 8 个逻辑内核的至强处理器、16 GB 内存、2 个 7200rpm 驱动器的 mdadm raid1。 PostgreSQL 需要处理大量数据。每天导入多达
当消息排入分区队列时,服务总线会检查分区键是否存在。如果找到,它将根据分区键选择片段。 但是当该片段已满时会发生什么,该片段中没有剩余空间。服务总线是否给出错误/消息被丢弃或任何其他片段将用于存储该消
我想知道将集合拆分为子集的有效方法是什么? Iterable> partitions = Iterables.partition(numbers, 10); 或 List> partitions =
有人可以告诉我 DATETIME 列上 HASH PARITION 与 RANGE PARTITION 的优缺点吗?假设我们有一个包含 2000 万条记录的 POS 表,并且想要根据交易日期的年份创建
我们有一个 cosmos-db 容器,其中包含大约 1M 条记录,其中包含有关客户的信息。 documentDb 的分区键是 customerId,它保存客户的唯一 GUID 引用。我已阅读parti
因此,我在写入数据库的步骤中有 2 个分区。我想记录每个分区写入的行数,得到总和,打印到日志中; 我正在考虑在编写器中使用static变量,并使用Step Context/Job Context在St
Bigquery 文档说可以更新分区表的分区时间到期。而我只能为摄取时间分区表执行此操作。我尝试了以下方法: bq query --use_legacy_sql=false ' CREATE
这是一个两部分的问题: 1) 是否可以根据数据的 ROWID 或其他标识符使用 select 语句检索数据所在分区的名称? 例如。 SELECT DATA_ID, CATEGORY, VALUE, *
注意:这几乎是 this question 的副本区别在于,在这种情况下,源表是日期分区的,而目标表尚不存在。此外,该问题的公认解决方案在这种情况下不起作用。 我试图将一天的数据从一个日期分区表复制到
我已经搜索了很多,但找不到有关以下场景的任何信息。 考虑一个包含超过 500,000 行、约 20 列和约 5 列上的 INDEX 的 InnoDB 表。 当该表处于以下情况时,执行“ALTER TA
如何将分区表(在 Oracle 10g 数据库中)更改为不仅用于分区而且还用于表本身的新表空间?我的意思是,我可以毫无问题地进行以下操作, --sql 改变表 abc 移动分区 abc01 表空间 n
我们正在尝试基于 BigQuery 在云中构建(或者更好地说重建)我们的 DWH。我们决定对原始数据使用“按日期字段分区”表(如“created_date”字段),而不是摄取时间分区,因为通过此功能,
给定一个使用分区的 Spring Batch 作业:
“每个分区中可以有许多键(及其相关值),但任何给定键的记录都在一个分区中。”这是一本著名的hadoop教科书的一行。我没有理解它的第二部分的全部含义,即“但是任何给定键的记录都在一个分区中。”这是否意
Let's say I have an Athena table mytable partitioned by columns A, B, and C.假设我有一个由列A、B和C分区的Athen
我正在寻找一些文档来了解 hive.exec.max.dynamic.partitions 和 hive.exec.max.dynamic.partitions.pernode 之间的区别。 我们什么
我看过一些关于创建分区的表的很好的解释,这些分区是 CLUSTERED BY 和 SORTED BY。这与创建带分区的表,然后使用 CLUSTER BY 填充表(例如使用 INSERT OVERWRI
使用摄取时间分区表,可以免费查询每个分区的行数。字节计费为 0。 SELECT DATE(_PARTITIONTME) AS dd, COUNT(*) FROM ds.ingestion_time_p
此处提供示例项目: https://github.com/codependent/micronaut-aws-lambda-proxy-graal 我在 Amazon AWS 上部署了一个 Micro
是否可以在不指定分区键的情况下通过其 ID 检索文档? 我的理解来自阅读 documentation是当未指定分区键时查询将在所有分区中扇出: The following query does not
我是一名优秀的程序员,十分优秀!