- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
消费者的配置如下。有时对于唯一的id,根本不创建消费者组。我正在尝试根据应用程序名称使用消息。即使消费者组的脚本也没有在列表中显示该特定消费者组。例如,我在日志中收到的给定 application8 的组 ID 根本没有创建,如下所示。
2019-11-14 14:09:27,719 信息 - 卡夫卡版本:2.3.12019-11-14 14:09:27,719 信息 - 卡夫卡提交 ID:18a913733fb71c012019-11-14 14:09:27,719 信息 - 卡夫卡 startTimeMs: 15737207677182019-11-14 14:09:27,720 INFO - [Consumer clientId=consumer-1, groupId=Application8] 订阅主题:config2019-11-14 14:09:27,955 信息 - [消费者 clientId=consumer-1,groupId=Application8] 集群 ID:h1TJ0oMkQYqO0z8ftlIzpA
public static void KafkaServerStart() throws IOException {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.0.134:9092");
props.put("group.id", "Application8");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
props.put("enable.auto.commit", "true");
props.put("heartbeat.interval.ms", "3000");
props.put("session.timeout.ms", "9000");
props.put("auto.offset.reset","latest");
consumer = new KafkaConsumer<String, byte[]>(props);
consumer.subscribe(Collections.singletonList("config"), new RebalanceConfigListener());
final Thread mainThread = Thread.currentThread();
// Registering a shutdown hook so we can exit cleanly
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("Starting exit...");
// KafkaConsumers.consumer.commitSync(KafkaConsumers.currentOffsets);
// Note that shutdownhook runs in a separate thread, so the only thing we can
// safely do to a consumer is wake it up
consumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
boolean commit = false;
for (ConsumerRecord<String, byte[]> record : records) {
/**
* Code for committing the offset on every iteration. Start.
*/
if (!commit)
commit = true;
/**
* Code for committing the offset on every iteration. End.
*/
// LiveDa.processData(record.key(), record.value(), record.offset(),
// record.partition());
Reinit.reInitMethod(new String(record.value()));
/*
* System.out.println("Key of the data " + record.key() + " ,values " + new
* String(record.value()) + " ,offset is " + record.offset() +
* " ,Partition ID " + record.partition());
*/
/**
* Code for committing the offset on every iteration. Start.
*/
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, "no metadata"));
/**
* Code for committing the offset on every iteration. End.
*/
}
/**
* Code for committing the offset on every iteration. Start.
*/
if (commit)
consumer.commitAsync(currentOffsets, null);
/**
* Code for committing the offset on every iteration. End.
*/
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// write logic on shutdown.
System.out.println("EXITING KAFKA");
/**
* Code for committing the offset on every iteration. Start.
*/
consumer.commitSync(currentOffsets);
/**
* Code for committing the offset on every iteration. End.
*/
consumer.close();
}
}
public static void main(String[] args) {
try {
KafkaConfigConsumer.KafkaServerStart();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
KafkaConfigConsumer.KafkaServerStart();
} catch (IOException e) {
SystemLogger.error(e);
}
}
最佳答案
我解决了 __consumer_offset 主题的问题,其中一个 kafka 节点已关闭,并且与该节点关联的分区是无领导的,因此在重置主题后,问题得到了解决。
关于java - 有时,确实会创建唯一 id 的消费者组,并且消费者会在没有分区的情况下卡住,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58852546/
关闭。这个问题需要更多 focused .它目前不接受答案。 想改进这个问题?更新问题,使其仅关注一个问题 editing this post . 4年前关闭。 Improve this questi
我已经完成了注册页面,并且运行顺利。 现在我需要弄清楚登录部分。我想要它,所以一旦用户登录,它就会将他们带到私有(private)页面,只有登录的用户才能看到。 它不需要针对每个用户进行个性化设置,只
出于个人好奇心,我目前正在学习区 block 链的工作原理。我正在学习这门类(class),现在我已经使用网络套接字设置了点对点连接。区 block 链应用程序的多个实例现在可以使用这些套接字运行并相
我读过: The blockchain database isn’t stored in any single location, meaning the records it keeps are t
Closed. This question needs to be more focused。它当前不接受答案。 想要改善这个问题吗?更新问题,使它仅关注editing this post的一个问题。
如果我在区块链中进行交易,是否只有在将交易添加到区块链后才会进行比特币转账?如果是这样,挖掘区块可能需要时间,并且无法进行紧急付款。那么这不是区块链的劣势吗? 最佳答案 如果您不重视能够在没有第三方(
Closed. This question needs to be more focused。它当前不接受答案。 想改善这个问题吗?更新问题,使其仅通过editing this post专注于一个问题
根据我的理解,我读到的关于区 block 链的所有内容都表明,即使在私有(private)区 block 链上,每个参与者都可以查看所有交易。我看到它提到区 block 链的一个用例可能是共享医疗数据
服务器正在发送消息时,如何阻止连接到服务器的一个IP地址。我的发送消息选项程序如下所示。 private void buttonSendMsg_Click(对象发送者,EventArgs e) {
iam正在hadoop apache 2.7.1上工作 和iam添加大小不超过100 Kb的文件 所以如果我将块大小配置为1 mb或默认值是 128兆字节 不会影响我的文件,因为它们只会保存在一个块中
我有一个docker-compose文件here。我可以连接到7051并注册我的chaincode客户端,但是当我尝试连接到localhost:7050时,我得到一个错误,该错误在使用curl测试时如
从数据类型来看,区 block 链是单链表吗?因为每个 block 都使用哈希引用前一个 block 。 或者它是某种树? 最佳答案 区 block 链表示为单链表的方式。每个 block 都有前一个
我无法理解给定代码片段的 hashcode() 部分。 我尝试过搜索它,但我无法弄清楚。 this.hash = Arrays.hashCode(new Integer[]{data.has
已关闭。这个问题是 not about programming or software development 。目前不接受答案。 这个问题似乎不是关于 a specific programming
我正在通过一些在线示例学习区 block 链。我有这个高级代码,我用以前的哈希创建一个新 block ,然后向它添加一个事务,然后生成 block 的困难哈希(有 8 个前导零) Block blo
我们有一个包含一些数字商品的网站。从那里购买的用户需要用 BTC 购买一些信用。在他购买信用卡后,脚本必须将他用 BTC 购买的货币 (USD) 数量加载到他的账户中。 所以这里我们有 HTML 表单
我目前正在使用 enumerateObjectsUsingBlock block 在 subview 下进行枚举,我怎样才能确定 block 的完成? 下面是区 block 内容 [self.view
我通常将显示 block 放在链接上,以使按钮的所有 div 都处于事件状态,而不仅仅是文本。但在这种情况下,我需要在 ul li 中使用 display:inline-block 我认为这会禁用其他
我正在尝试创建付款账单并通过电报机器人发送给我的客户:我正在使用区 block 链 API V2-https://blockchain.info/api/api 接收。我的代码是: xpub='***
有个面试题:区 block 链和不可变链表有什么区别? 我回答他们是相同的技术,然后没有通过测试。请纠正我的错误。 最佳答案 链表中的每一项通常通过指针或内存地址指向链表中的下一项。 区 block
我是一名优秀的程序员,十分优秀!