- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
本文已收录到 AndroidFamily ,技术和职场问题,请关注公众号 [彭旭锐] 提问.
大家好,我是小彭.
在上一篇文章里 ,我们聊到了 Square 开源的 I/O 框架 Okio 的三个优势:精简且全面的 API、基于共享的缓冲区设计以及超时机制。前两个优势已经分析过了,今天我们来分析 Okio 的超时检测机制.
本文源码基于 Okio v3.2.0.
思维导图:
超时机制是一项通用的系统设计,能够避免系统长时间阻塞在某些任务上。例如网络请求在超时时间内没有响应,客户端就会提前中断请求,并提示用户某些功能不可用.
先思考一个问题,相比于传统 IO 的超时有什么优势呢?我认为主要体现在 2 个方面:
Java 原生 IO 操作是否支持超时,完全取决于底层的系统调用是否支持。例如,网络 Socket 支持通过 setSoTimeout API 设置单次 IO 操作的超时时间,而文件 IO 操作就不支持,使用原生文件 IO 就无法实现超时.
而 Okio 是统一在应用层实现超时检测,不管系统调用是否支持超时,都能提供统一的超时检测机制.
Java 原生 IO 操作只能实现对单次 IO 操作的超时检测,无法实现对包含多次 IO 操作的复合任务超时检测。例如,OkHttp 支持配置单次 connect、read 或 write 操作的超时检测,还支持对一次完整 Call 请求的超时检测,有时候单个操作没有超时,但串联起来的完整 call 却超时了.
而 Okio 超时机制和 IO 操作没有强耦合,不仅支持对 IO 操作的超时检测,还支持非 IO 操作的超时检测,所以这种复合任务的超时检测也是可以实现的.
Timeout 类是 Okio 超时机制的核心类,Okio 对 Source 输入流和 Sink 输出流都提供了超时机制,我们在构造 InputStreamSource 和 OutputStreamSink 这些流的实现类时,都需要携带 Timeout 对象:
Source.kt 。
interface Source : Closeable {
// 返回超时控制对象
fun timeout(): Timeout
...
}
Sink.kt 。
actual interface Sink : Closeable, Flushable {
// 返回超时控制对象
actual fun timeout(): Timeout
...
}
Timeout 类提供了两种配置超时时间的方式(如果两种方式同时存在的话,Timeout 会优先采用更早的截止时间):
最终触发超时的截止时间是任务的 startTime + timeoutNanos ; 。
Timeout.kt 。
// hasDeadline 这个属性显得没必要
private var hasDeadline = false // 是否设置了截止时间点
private var deadlineNanoTime = 0L // 截止时间点(单位纳秒)
private var timeoutNanos = 0L // 处理单次任务的超时时间(单位纳秒)
创建 Source 和 Sink 对象时,都需要携带 Timeout 对象:
JvmOkio.kt 。
// ----------------------------------------------------------------------------
// 输入流
// ----------------------------------------------------------------------------
fun InputStream.source(): Source = InputStreamSource(this, Timeout() /*Timeout 对象*/)
// 文件输入流
fun File.source(): Source = InputStreamSource(inputStream(), Timeout.NONE)
// Socket 输入流
fun Socket.source(): Source {
val timeout = SocketAsyncTimeout(this)
val source = InputStreamSource(getInputStream(), timeout /*携带 Timeout 对象*/)
// 包装为异步超时
return timeout.source(source)
}
// ----------------------------------------------------------------------------
// 输出流
// ----------------------------------------------------------------------------
fun OutputStream.sink(): Sink = OutputStreamSink(this, Timeout() /*Timeout 对象*/)
// 文件输出流
fun File.sink(append: Boolean = false): Sink = FileOutputStream(this, append).sink()
// Socket 输出流
fun Socket.sink(): Sink {
val timeout = SocketAsyncTimeout(this)
val sink = OutputStreamSink(getOutputStream(), timeout /*携带 Timeout 对象*/)
// 包装为异步超时
return timeout.sink(sink)
}
在 Timeout 类的基础上,Okio 提供了 2 种超时机制:
Okio 框架 。
Timeout 同步超时依赖于 Timeout#throwIfReached() 方法.
同步超时在每次执行任务之前,都需要先调用 Timeout#throwIfReached() 检查当前时间是否到达超时截止时间。如果超时则会直接抛出超时异常,不会再执行任务.
JvmOkio.kt 。
private class InputStreamSource(
// 输入流
private val input: InputStream,
// 超时控制
private val timeout: Timeout
) : Source {
override fun read(sink: Buffer, byteCount: Long): Long {
// 1、参数校验
if (byteCount == 0L) return 0
require(byteCount >= 0) { "byteCount < 0: $byteCount" }
// 2、检查超时时间
timeout.throwIfReached()
// 3、执行输入任务(已简化)
val bytesRead = input.read(...)
return bytesRead.toLong()
}
...
}
private class OutputStreamSink(
// 输出流
private val out: OutputStream,
// 超时控制
private val timeout: Timeout
) : Sink {
override fun write(source: Buffer, byteCount: Long) {
// 1、参数校验
checkOffsetAndCount(source.size, 0, byteCount)
// 2、检查超时时间
timeout.throwIfReached()
// 3、执行输入任务(已简化)
out.write(...)
...
}
...
}
看一眼 Timeout#throwIfReached 的源码。 可以看到,同步超时只考虑 “deadlineNanoTime 截止时间”,如果只设置 “timeoutNanos 任务处理时间” 是无效的,我觉得这个设计容易让开发者出错.
Timeout.kt 。
@Throws(IOException::class)
open fun throwIfReached() {
if (Thread.interrupted()) {
// 传递中断状态
Thread.currentThread().interrupt() // Retain interrupted status.
throw InterruptedIOException("interrupted")
}
if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
// 抛出超时异常
throw InterruptedIOException("deadline reached")
}
}
有必要解释所谓 “同步” 的意思:
同步超时就是指任务的 “执行” 和 “超时检查” 是同步的。当任务超时时,Okio 同步超时不会直接中断任务执行,而是需要检主动查超时时间(Timeout#throwIfReached)来判断是否发生超时,再决定是否中断任务执行.
这其实与 Java 的中断机制是非常相似的:
当 Java 线程的中断标记位置位时,并不是真的会直接中断线程执行,而是主动需要检查中断标记位(Thread.interrupted)来判断是否发生中断,再决定是否中断线程任务。所以说 Java 的线程中断机制是一种 “同步中断”.
可以看出,同步超时存在 “滞后性”:
因为同步超时需要主动检查,所以即使在任务执行过程中发生超时,也必须等到检查时才会发现超时,无法及时触发超时异常。因此,就需要异步超时机制.
同步超时示意图 。
异步超时监控进入: 异步超时在每次执行任务之前,都需要先调用 AsyncTimeout#enter() 方法将 AsyncTimeout 挂载到超时队列中,并根据超时截止时间的先后顺序排序,队列头部的节点就是会最先超时的任务; 。
异步超时监控退出: 在每次任务执行结束之后,都需要再调用 AsyncTimeout#exit() 方法将 AsyncTimeout 从超时队列中移除.
注意: enter() 方法和 eixt() 方法必须成对存在.
AsyncTimeout.kt 。
open class AsyncTimeout : Timeout() {
// 是否在等待队列中
private var inQueue = false
// 后续指针
private var next: AsyncTimeout? = null
// 超时截止时间
private var timeoutAt = 0L
// 异步超时监控进入
fun enter() {
check(!inQueue) { "Unbalanced enter/exit" }
val timeoutNanos = timeoutNanos()
val hasDeadline = hasDeadline()
if (timeoutNanos == 0L && !hasDeadline) {
return
}
inQueue = true
scheduleTimeout(this, timeoutNanos, hasDeadline)
}
// 异步超时监控退出
// 返回值:是否发生超时(如果节点不存在,说明被 WatchDog 线程移除,即发生超时)
fun exit(): Boolean {
if (!inQueue) return false
inQueue = false
return cancelScheduledTimeout(this)
}
// 在 WatchDog 线程调用
protected open fun timedOut() {}
companion object {
// 超时队列头节点(哨兵节点)
private var head: AsyncTimeout? = null
// 分发超时监控任务
private fun scheduleTimeout(node: AsyncTimeout, timeoutNanos: Long, hasDeadline: Boolean) {
synchronized(AsyncTimeout::class.java) {
// 首次添加监控时,需要启动 Watchdog 线程
if (head == null) {
// 哨兵节点
head = AsyncTimeout()
Watchdog().start()
}
// now:当前时间
val now = System.nanoTime()
// timeoutAt 超时截止时间:计算 now + timeoutNanos 和 deadlineNanoTime 的较小值
if (timeoutNanos != 0L && hasDeadline) {
node.timeoutAt = now + minOf(timeoutNanos, node.deadlineNanoTime() - now)
} else if (timeoutNanos != 0L) {
node.timeoutAt = now + timeoutNanos
} else if (hasDeadline) {
node.timeoutAt = node.deadlineNanoTime()
} else {
throw AssertionError()
}
// remainingNanos 超时剩余时间:当前时间距离超时发生的时间
val remainingNanos = node.remainingNanos(now)
var prev = head!!
// 线性遍历超时队列,按照超时截止时间将 node 节点插入超时队列
while (true) {
if (prev.next == null || remainingNanos < prev.next!!.remainingNanos(now)) {
node.next = prev.next
prev.next = node
// 如果插入到队列头部,需要唤醒 WatchDog 线程
if (prev === head) {
(AsyncTimeout::class.java as Object).notify()
}
break
}
prev = prev.next!!
}
}
}
// 取消超时监控任务
// 返回值:是否超时
private fun cancelScheduledTimeout(node: AsyncTimeout): Boolean {
synchronized(AsyncTimeout::class.java) {
// 线性遍历超时队列,将 node 节点移除
var prev = head
while (prev != null) {
if (prev.next === node) {
prev.next = node.next
node.next = null
return false
}
prev = prev.next
}
// 如果节点不存在,说明被 WatchDog 线程移除,即发生超时
return true
}
}
}
}
同时,在首次添加异步超时监控时,AsyncTimeout 内部会开启一个 WatchDog 守护线程,按照 “检测 - 等待” 模型观察超时队列的头节点:
如果发生超时,则将头节点移除,并回调 AsyncTimeout#timeOut() 方法。这是一个空方法,需要由子类实现来主动取消任务; 。
如果未发生超时,则 WatchDog 线程会计算距离超时发生的时间间隔,调用 Object#wait(时间间隔) 进入限时等待.
需要注意的是: AsyncTimeout#timeOut() 回调中不能执行耗时操作,否则会影响后续检测的及时性.
有意思的是:我们会发现 Okio 的超时检测机制和 Android ANR 的超时检测机制非常类似,所以我们可以说 ANR 也是一种异步超时机制.
AsyncTimeout.kt 。
private class Watchdog internal constructor() : Thread("Okio Watchdog") {
init {
// 守护线程
isDaemon = true
}
override fun run() {
// 死循环
while (true) {
try {
var timedOut: AsyncTimeout? = null
synchronized(AsyncTimeout::class.java) {
// 取头节点(Maybe wait)
timedOut = awaitTimeout()
// 超时队列为空,退出线程
if (timedOut === head) {
head = null
return
}
}
// 超时发生,触发 AsyncTimeout#timedOut 回调
timedOut?.timedOut()
} catch (ignored: InterruptedException) {
}
}
}
}
companion object {
// 超时队列为空时,再等待一轮的时间
private val IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60)
private val IDLE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(IDLE_TIMEOUT_MILLIS)
@Throws(InterruptedException::class)
internal fun awaitTimeout(): AsyncTimeout? {
// Get the next eligible node.
val node = head!!.next
// 如果超时队列为空
if (node == null) {
// 需要再等待 60s 后再判断(例如在首次添加监控时)
val startNanos = System.nanoTime()
(AsyncTimeout::class.java as Object).wait(IDLE_TIMEOUT_MILLIS)
return if (head!!.next == null && System.nanoTime() - startNanos >= IDLE_TIMEOUT_NANOS) {
// 退出 WatchDog 线程
head
} else {
// WatchDog 线程重新取一次
null
}
}
// 计算当前时间距离超时发生的时间
var waitNanos = node.remainingNanos(System.nanoTime())
// 未超时,进入限时等待
if (waitNanos > 0) {
// Waiting is made complicated by the fact that we work in nanoseconds,
// but the API wants (millis, nanos) in two arguments.
val waitMillis = waitNanos / 1000000L
waitNanos -= waitMillis * 1000000L
(AsyncTimeout::class.java as Object).wait(waitMillis, waitNanos.toInt())
return null
}
// 超时,将头节点移除
head!!.next = node.next
node.next = null
return node
}
}
异步超时示意图 。
直接看代码不好理解,我们来举个例子:
在 OkHttp 中,支持配置一次完整的 Call 请求上的操作时间 callTimeout。一次 Call 请求包含多个 IO 操作的复合任务,使用传统 IO 是不可能监控超时的,所以需要使用 AsyncTimeout 异步超时.
在 OkHttp 的 RealCall 请求类中,就使用了 AsyncTimeout 异步超时:
1、开始任务: 在 execute() 方法中,调用 AsyncTimeout#enter() 进入异步超时监控,再执行请求; 。
2、结束任务: 在 callDone() 方法中,调用 AsyncTimeout#exit() 退出异步超时监控。分析源码发现:callDone() 不仅在请求正常时会调用,在取消请求时也会回调,保证了 enter() 和 exit() 成对存在; 。
3、超时回调: 在 AsyncTimeout#timeOut 超时回调中,调用了 Call#cancel() 提前取消请求。Call#cancel() 会调用到 Socket#close(),让阻塞中的 IO 操作抛出 SocketException 异常,以达到提前中断的目的,最终也会走到 callDone() 执行 exit() 退出异步监控.
Call 超时监控示意图 。
RealCall 。
class RealCall(
val client: OkHttpClient,
/** The application's original request unadulterated by redirects or auth headers. */
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
// 3、AsyncTimeout 超时监控
private val timeout = object : AsyncTimeout() {
override fun timedOut() {
// 取消请求
cancel()
}
}.apply {
timeout(client.callTimeoutMillis.toLong(), MILLISECONDS)
}
// 取消请求
override fun cancel() {
if (canceled) return // Already canceled.
canceled = true
exchange?.cancel()
// 最终会调用 Socket#close()
connectionToCancel?.cancel()
eventListener.canceled(this)
}
// 1、请求开始(由业务层调用)
override fun execute(): Response {
// 1.1 异步超时监控进入
timeout.enter()
// 1.2 执行请求
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
}
// 2、请求结束(由 OkHttp 引擎层调用,包含正常和异常情况)
// 除了 IO 操作在抛出异常后会走到 callDone(),在取消请求时也会走到 callDone()
internal fun <E : IOException?> messageDone(
exchange: Exchange,
requestDone: Boolean, // 请求正常结束
responseDone: Boolean, // 响应正常结束
e: E
): E {
...
if (callDone) {
return callDone(e)
}
return e
}
private fun <E : IOException?> callDone(e: E): E {
...
// 检查是否超时
val result = timeoutExit(e)
if (e != null) {
// 请求异常(包含超时异常)
eventListener.callFailed(this, result!!)
} else {
// 请求正常结束
eventListener.callEnd(this)
}
return result
}
private fun <E : IOException?> timeoutExit(cause: E): E {
if (timeoutEarlyExit) return cause
// 2.1 异步超时监控退出
if (!timeout.exit()) return cause
// 2.2 包装超时异常
val e = InterruptedIOException("timeout")
if (cause != null) e.initCause(cause)
return e as E
}
}
调用 Socket#close() 会让阻塞中的 IO 操作抛出 SocketException 异常:
Socket.java 。
// Any thread currently blocked in an I/O operation upon this socket will throw a {@link SocketException}.
public synchronized void close() throws IOException {
synchronized(closeLock) {
if (isClosed())
return;
if (created)
impl.close();
closed = true;
}
}
Exchange 中会捕获 Socket#close() 抛出的 SocketException 异常:
Exchange.kt 。
private inner class RequestBodySink(
delegate: Sink,
/** The exact number of bytes to be written, or -1L if that is unknown. */
private val contentLength: Long
) : ForwardingSink(delegate) {
@Throws(IOException::class)
override fun write(source: Buffer, byteCount: Long) {
...
try {
super.write(source, byteCount)
this.bytesReceived += byteCount
} catch (e: IOException) {
// Socket#close() 会抛出异常,被这里拦截
throw complete(e)
}
}
private fun <E : IOException?> complete(e: E): E {
if (completed) return e
completed = true
return bodyComplete(bytesReceived, responseDone = false, requestDone = true, e = e)
}
}
fun <E : IOException?> bodyComplete(
bytesRead: Long,
responseDone: Boolean,
requestDone: Boolean,
e: E
): E {
...
// 回调到上面的 RealCall#messageDone
return call.messageDone(this, requestDone, responseDone, e)
}
先说一下 Okhttp 定义的 2 种颗粒度的超时:
其实 Socket 支持通过 setSoTimeout API 设置单次操作的超时时间,但这个 API 无法满足需求,比如说 Call 超时是包含多个 IO 操作的复合任务,而且不管是 HTTP/1 并行请求还是 HTTP/2 多路复用,都会存在一个 Socket 连接上同时承载多个请求的情况,无法区分是哪个请求超时.
因此,OkHttp 采用了两种超时监测:
最后此篇关于AndroidIO框架Okio的实现原理,如何检测超时?的文章就讲到这里了,如果你想了解更多关于AndroidIO框架Okio的实现原理,如何检测超时?的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
这是一个与 Get OS-Version in WinRT Metro App C# 相关的问题但不是它的重复项。 是否有任何选项可以从 Metro 应用程序检测系统上是否有可用的桌面功能?据我所知,
我想在闹钟响起时做点什么。例如, toast 或设置新闹钟。我正在寻找可以检测闹钟何时响起的东西。首先,我在寻找广播 Action ,但找不到。也许是我的错? 当闹钟响起时,还有其他方法可以做些什么吗
如果某个 JS 添加了一个突变观察者,其他 JS 是否有可能检测、删除、替换或更改该观察者?我担心的是,如果某些 JS 旨在破坏某些 DOM 元素而不被发现,那么 JS 可能想要摆脱任何观察该 DOM
Closed. This question does not meet Stack Overflow guidelines。它当前不接受答案。 想要改善这个问题吗?更新问题,以便将其作为on-topi
有没有办法在您的 Activity/应用程序中(以编程方式)知道用户已通过 USB 将您的手机连接到 PC? 最佳答案 有人建议使用 UMS_CONNECTED自最新版本的 Android 起已弃用
我正在想办法测量速度滚动事件,这将产生某种代表速度的数字(相对于所花费的时间,从滚动点 A 到点 B 的距离)。 我欢迎任何以伪代码形式提出的建议...... 我试图在网上找到有关此问题的信息,但找不
某些 JavaScript 是否可以检测 Skype 是否安装? 我问的原因是我想基于此更改链接的 href:如果未安装 Skype,则显示一个弹出窗口,解释 Skype 是什么以及如何安装它,如果已
我们正在为 OS X 制作一个使用 Quartz Events 移动光标的用户空间设备驱动程序,当游戏(尤其是在窗口模式下运行的游戏)无法正确捕获鼠标指针时,我们遇到了问题(= 将其包含/保留在其窗口
我可以在 Controller 中看到事件 $routeChangeStart,但我不知道如何告诉 Angular 留下来。我需要弹出类似“您要保存、删除还是取消吗?”的信息。如果用户选择取消,则停留
我正在解决一个问题,并且已经花了一些时间。问题陈述:给你一个正整数和负整数的数组。如果索引处的数字 n 为正,则向前移动 n 步。相反,如果为负数(-n),则向后移动 n 步。假设数组的第一个元素向前
我试图建立一个条件,其中 [i] 是 data.length 的值,问题是当有超过 1 个值时一切正常,但当只有 1 个值时,脚本不起作用。 out.href = data[i].hr
这是我的问题,我需要检测图像中的 bolt 和四分之一,我一直在搜索并找到 OpenCV,但据我所知它还没有在 Java 中。你们打算如何解决这个问题? 最佳答案 实际上有一个 OpenCV 的 Ja
是否可以检测 ping? IE。设备 1 ping 设备 2,我想要可以在设备 2 上运行的代码,该代码可以在设备 1 ping 设备时进行检测。 最佳答案 ping 实用程序使用的字面消息(“ICM
我每天多次运行构建脚本。我的感觉是我和我的同事花费了大量时间等待这个脚本执行。现在想知道:我们每天花多少时间等待脚本执行? .我可以对总体平均值感到满意,即使我真的很想拥有每天的数据(例如“上周一我们
我已经完成了对项目的编码,但是当我在客户端中提交了源代码时,就对它进行了测试,然后检测到内存泄漏。我已经在Instruments using Leaks中进行了测试。 我遇到的问题是AVPlayer和
我想我可以用 std.traits.functionAttributes 来做到这一点,但它不支持 static。对于任何类型的可调用对象(包含 opCall 的结构),我如何判断该可调用对象是否使用
我正在使用多核 R 包中的并行和收集函数来并行化简单的矩阵乘法代码。答案是正确的,但并行版本似乎与串行版本花费的时间相同。 我怀疑它仅在一个内核上运行(而不是在我的机器上可用的 8 个内核!)。有没有
我正在尝试在读取 csv 文件时编写一个这样的 if 语句: if row = [] or EOF: do stuff 我在网上搜索过,但找不到任何方法可以做到这一点。帮忙? 最佳答案 wit
我想捕捉一个 onFontSizeChange 事件然后做一些事情(比如重新渲染,因为浏览器已经改变了我的字体大小)。不幸的是,不存在这样的事件,所以我必须找到一种方法来做到这一点。 我见过有人在不可
我有一个使用 Windows 服务的 C# 应用程序,该服务并非始终打开,我希望能够在该服务启动和关闭时发送电子邮件通知。我已经编写了电子邮件脚本,但我似乎无法弄清楚如何检测服务状态更改。 我一直在阅
我是一名优秀的程序员,十分优秀!