- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
我们会看到zk的数据中有一个节点/log_dir_event_notification/
,这是一个序列号持久节点
这个节点在kafka中承担的作用是: 当某个Broker上的LogDir
出现异常时(比如磁盘损坏,文件读写失败,等等异常): 向zk中谢增一个子节点/log_dir_event_notification/log_dir_event_序列号
;Controller监听到这个节点的变更之后,会向Brokers们发送LeaderAndIsrRequest
请求; 然后做一些副本脱机的善后操作
这里说的dirLog是 server.properties中配置的log.dir 例如
首先我们找到有使用这个节点的源码;
kafka启动之初有调用
ReplicaManager.startup()
def startup(): Unit = {
// 省略...
//当inter-broker protocol (IBP) < 1.0的时候,如果存在logDir的一些异常则直接让整个Broker启动失败;
val haltBrokerOnFailure = config.interBrokerProtocolVersion < KAFKA_1_0_IV0
logDirFailureHandler = new LogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure)
logDirFailureHandler.start()
}
private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) {
override def doWork(): Unit = {
//从队列 offlineLogDirQueue 取数据
val newOfflineLogDir = logDirFailureChannel.takeNextOfflineLogDir()
if (haltBrokerOnDirFailure) {
fatal(s"Halting broker because dir $newOfflineLogDir is offline")
Exit.halt(1)
}
handleLogDirFailure(newOfflineLogDir)
}
}
// logDir should be an absolute path
// sendZkNotification is needed for unit test
def handleLogDirFailure(dir: String, sendZkNotification: Boolean = true): Unit = {
// 省略...
logManager.handleLogDirFailure(dir)
if (sendZkNotification)
zkClient.propagateLogDirEvent(localBrokerId)
warn(s"Stopped serving replicas in dir $dir")
}
代码比较长,就直接概况一下好了:
主要是当读取或操作LogDir的时候出现了异常就会执行到这里,有可能是磁盘脱机了,或者文件突然没有读取写入权限等等之类的一些IOException异常;那么 Broker就需要做一些处理;如下
inter.broker.protocol.version
协议版本 < 1.0
的时候 时候直接退出;那个时候还不支持单Broker上存在多个logDir;log_dir
都脱机(或者异常了), 那么久可以直接shutdown这台机器了log_dir
还有在线的, 那么继续做一些其他的清理操作;/log_dir_event_notification/log_dir_event_
+序列号;数据是 BrokerID;例如:plaintext
``
| |
{"version":1,"broker":20003,"event":1}
|
PS: log_dir 是可以在一台Broker配置多个路径的 ,用逗号隔开
比如说在 给文件加锁的时候lockLogDirs,磁盘损坏了就抛出异常IOException
/**
* Lock all the given directories
*/
private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = {
dirs.flatMap { dir =>
try {
val lock = new FileLock(new File(dir, LockFile))
if (!lock.tryLock())
throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParent +
". A Kafka instance in another process or thread is using this directory.")
Some(lock)
} catch {
case e: IOException =>
logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while locking directory $dir", e)
None
}
}
}
def maybeAddOfflineLogDir(logDir: String, msg: => String, e: IOException): Unit = {
error(msg, e)
if (offlineLogDirs.putIfAbsent(logDir, logDir) == null)
offlineLogDirQueue.add(logDir)
}
offlineLogDirQueue添加了一个异常队列之后就回到上面的副本异常处理代码了, 上面可是一致在queue.take()的
KafkaController.processLogDirEventNotification
private def processLogDirEventNotification(): Unit = {
if (!isActive) return
val sequenceNumbers = zkClient.getAllLogDirEventNotifications
try {
val brokerIds = zkClient.getBrokerIdsFromLogDirEvents(sequenceNumbers)
//尝试将这台Broker上的所有副本 走一下状态流转 到 OnlineReplica
onBrokerLogDirFailure(brokerIds)
} finally {
// delete processed children
zkClient.deleteLogDirEventNotifications(sequenceNumbers, controllerContext.epochZkVersion)
}
}
主要将从zk节点 /log_dir_event_notification/log_dir_event_序列号 中获取到的数据的Broker上的所有副本进行一个副本状态流转 ->OnlineReplica ;关于状态机的流转请看 【kafka源码】Controller中的状态机
LeaderAndIsrRequest
请求,让brokers们去查询他们的副本的状态,如果副本logDir已经离线则返回KAFKA_STORAGE_ERROR
异常;【kafka源码】/log_dir_event_notification的LogDir脱机事件通知
我有内存泄漏,由于没有正确关闭连接。这是由于使用全局函数访问数据库(使用不同的 sql 字符串),但我传回了一个 sqldatareader。 我无法在方法中关闭它,也无法关闭与数据库的连接,因为它会
我正在尝试在脱机模式下使用yarn,因为我正在使用的生成服务器无法访问yarn注册表或github.com。 我在脱机模式下找到如何使用yarn的过程中找到了这个article,该效果很好,直到添加了
重置工作区 很多时候不小心将面板弄乱了,那么怎么办呢? ,我么可以先自己搞一个喜欢的工作区,然后另存为当前工作区 下图是我调整好的工作区 如果不小心搞乱了,那么别关软件而是,重置工作区为初始状态 如果
我正在使用 USB 转 RS232 电缆在两台 Linux 机器之间进行通信。在连接了 USB 端的机器上,我运行: dmesg | grep tty 并得到如下输出: console [tty0]
Azure 应用服务部署任务在“其他部署选项:使应用程序脱机”下有一个用于使应用程序脱机的复选框。如果我检查是否足以让APP离线?如何自动执行添加 app_offline.htm 的过程? 使应用程序
Azure 应用服务部署任务在“其他部署选项:使应用程序脱机”下有一个用于使应用程序脱机的复选框。如果我检查是否足以让APP离线?如何自动执行添加 app_offline.htm 的过程? 使应用程序
我是一名优秀的程序员,十分优秀!