- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我想检测 Netty 的 EventLoop
以便:
我知道 EmbeddedChannel
并在一些测试中使用它。但我想要的是介于单元测试和集成测试之间的东西,对某些极端情况仍然视而不见。断连重连和ping场景大量使用定时任务。我可以添加具有巨大延迟的确定性,但我不希望我的测试等待几秒钟或更长时间。所以检测 Netty 的 EventLoop
看起来像是解决方案。
我已经编写了至少对我来说有意义的代码。
ScheduledFutureTask#nanoTime
以返回我的值。NioEventLoopGroup
,所以我捕获了任务的最后期限。ScheduledFutureTask#nanoTime
返回的值。好的是 Netty 代码仅依赖于 ScheduledFutureTask#nanoTime
返回的值(伟大的设计!)所以这是一个非常有限的变化。我使用 ByteBuddy 来避免复制粘贴 Netty 代码,但这并不重要。
像 InstrumentedNioEventLoopGroupTest
这样的非常简单的测试在仅安排 1 个任务时失败,因为 AbstractScheduledEventExecutor#pollScheduledTask(long)
有一个空队列。
我发现每个 NioEventLoop
都有自己的任务队列,队列轮询可能不会发生,因为 NioEventLoopGroup
等待 Selector
发出信号,这是有道理的。所以我将NioEventLoopGroup
的线程数增加到2。我也尝试将ioRatio
设置为1并安排更多任务,但没有更好的结果。使用调试器,似乎我的任务总是“落在”未轮询的任务队列中。
有什么想法可以让这项工作成功吗?我正在使用 Netty 4.1.24.Final。
ScheduledFutureTaskHack.java
package com.otcdlink.chiron.integration.harness;
import com.otcdlink.chiron.toolbox.ToStringTools;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.agent.ByteBuddyAgent;
import net.bytebuddy.dynamic.loading.ClassReloadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.Future;
import java.util.function.LongSupplier;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static net.bytebuddy.matcher.ElementMatchers.isPackagePrivate;
import static net.bytebuddy.matcher.ElementMatchers.isStatic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
/**
*
* Got the delegation working with the help of
* https://www.infoq.com/articles/Easily-Create-Java-Agents-with-ByteBuddy
*/
final class ScheduledFutureTaskHack {
private static final Logger LOGGER = LoggerFactory.getLogger( ScheduledFutureTaskHack.class ) ;
private static final Class< ? > SCHEDULEDFUTURETASK_CLASS ;
private static final Method SCHEDULEDFUTURETASK_NANOTIME_METHOD ;
private static final Method SCHEDULEDFUTURETASK_DEADLINENANOS_METHOD ;
private static final Field SCHEDULEDFUTURETASK_DEADLINENANOS_FIELD ;
private static final Field SCHEDULEDFUTURETASK_STARTTIME_FIELD ;
static {
try {
SCHEDULEDFUTURETASK_CLASS = Class.forName( "io.netty.util.concurrent.ScheduledFutureTask" ) ;
SCHEDULEDFUTURETASK_NANOTIME_METHOD =
SCHEDULEDFUTURETASK_CLASS.getDeclaredMethod( "nanoTime" ) ;
SCHEDULEDFUTURETASK_NANOTIME_METHOD.setAccessible( true ) ;
SCHEDULEDFUTURETASK_DEADLINENANOS_METHOD =
SCHEDULEDFUTURETASK_CLASS.getDeclaredMethod( "deadlineNanos") ;
SCHEDULEDFUTURETASK_DEADLINENANOS_METHOD.setAccessible( true ) ;
SCHEDULEDFUTURETASK_DEADLINENANOS_FIELD =
SCHEDULEDFUTURETASK_CLASS.getDeclaredField( "deadlineNanos" ) ;
SCHEDULEDFUTURETASK_DEADLINENANOS_FIELD.setAccessible( true ) ;
SCHEDULEDFUTURETASK_STARTTIME_FIELD =
SCHEDULEDFUTURETASK_CLASS.getDeclaredField( "START_TIME" ) ;
SCHEDULEDFUTURETASK_STARTTIME_FIELD.setAccessible( true ) ;
} catch( ClassNotFoundException | NoSuchMethodException | NoSuchFieldException e ) {
throw new Error( e ) ;
}
}
/**
* Everything is this class must be visible from the redefined class.
*/
@SuppressWarnings( "unused" )
public static final class StaticMethodDelegate {
/**
* Calls to {@link io.netty.util.concurrent.ScheduledFutureTask#nanoTime()} are redirected
* to this method.
* Sadly we can't use parameter annotated with {@link @This} or something giving a hint
* about the call context. It looks like a consequence of JVMTI reload not supporting method
* addition (adding a parameter would imply creating a new method).
*/
public static long nanoTime() {
final long supplied = longSupplier.getAsLong() ;
LOGGER.debug( "Called " + StaticMethodDelegate.class.getSimpleName() + "#nanoTime(), " +
"returns " + supplied + "." ) ;
return supplied ;
}
}
private static LongSupplier longSupplier = null ;
static void install( final LongSupplier longSupplier ) {
install( longSupplier, true ) ;
}
/**
*
* @param longSupplier
* @param suppliedNanosRelativeToClassloadingTime if {@code true}, supplied nanoseconds are
* relative to {@link io.netty.util.concurrent.ScheduledFutureTask#START_TIME}.
* Original behavior of the hacked method is to substract
* {@link io.netty.util.concurrent.ScheduledFutureTask#START_TIME} from value returned
* by {@link System#nanoTime()} (probably to make number more readable and reduce the risk
* of an overflow). During tests we prefer to not care about start time so there is this
* option to add it automatically.
*/
static void install(
final LongSupplier longSupplier,
final boolean suppliedNanosRelativeToClassloadingTime
) {
checkState( ScheduledFutureTaskHack.longSupplier == null ) ;
if( suppliedNanosRelativeToClassloadingTime ) {
final long startTime = START_TIME ;
LOGGER.debug(
"Installing with value of " +
SCHEDULEDFUTURETASK_STARTTIME_FIELD.toGenericString() +
" = " + startTime + " automatically added to the values supplied."
) ;
class AdjustedLongSupplier implements LongSupplier {
@Override
public long getAsLong() {
return longSupplier.getAsLong() + startTime ;
}
@Override
public String toString() {
return ToStringTools.getNiceClassName( this ) + "{startTime=" + startTime + "}" ;
}
}
ScheduledFutureTaskHack.longSupplier = new AdjustedLongSupplier() ;
} else {
ScheduledFutureTaskHack.longSupplier = checkNotNull( longSupplier ) ;
}
ByteBuddyAgent.install() ;
LOGGER.info( "Successfully installed ByteBuddy Agent." ) ;
redefineClass() ;
LOGGER.info( "Successfully redefined static method implementation." ) ;
}
private static void redefineClass() {
new ByteBuddy()
.redefine( SCHEDULEDFUTURETASK_CLASS )
.method( named( "nanoTime" )
.and( isStatic() )
.and( isPackagePrivate() )
.and( takesArguments( 0 ) )
.and( returns( long.class ) )
)
.intercept( MethodDelegation.to( StaticMethodDelegate.class ) )
.make()
.load( ScheduledFutureTaskHack.class.getClassLoader(), ClassReloadingStrategy.fromInstalledAgent() )
;
}
/**
* Invokes method replacing {@link io.netty.util.concurrent.ScheduledFutureTask#nanoTime()}.
*/
public static long invokeNanoTime() {
try {
return ( long ) SCHEDULEDFUTURETASK_NANOTIME_METHOD.invoke( null ) ;
} catch( IllegalAccessException | InvocationTargetException e ) {
throw new Error( e ) ;
}
}
/**
* The {@link io.netty.util.concurrent.ScheduledFutureTask#deadlineNanos()} method returns
* the value made from {@link System#nanoTime()},
* minus {@link io.netty.util.concurrent.ScheduledFutureTask#START_TIME},
* plus the delay before executing the task.
*/
public static Long invokeDeadlineNanos( final Future future ) {
try {
if( SCHEDULEDFUTURETASK_DEADLINENANOS_METHOD.getDeclaringClass()
.isAssignableFrom( future.getClass() )
) {
return ( long ) SCHEDULEDFUTURETASK_DEADLINENANOS_METHOD.invoke( future ) ;
} else {
return null ;
}
} catch( IllegalAccessException | InvocationTargetException e ) {
throw new Error(
"Could not access method " + SCHEDULEDFUTURETASK_DEADLINENANOS_METHOD + " in " + future,
e
) ;
}
}
private static long readStartTime() {
try {
return ( long ) SCHEDULEDFUTURETASK_STARTTIME_FIELD.get( null ) ;
} catch( IllegalAccessException e ) {
throw new Error(
"Could not access static field " + SCHEDULEDFUTURETASK_STARTTIME_FIELD,
e
) ;
}
}
public static final long START_TIME = readStartTime() ;
}
ScheduledFutureTaskHackTest.java
package com.otcdlink.chiron.integration.harness;
import com.otcdlink.chiron.toolbox.ToStringTools;
import com.otcdlink.chiron.toolbox.netty.NettyTools;
import io.netty.channel.nio.NioEventLoopGroup;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.assertj.core.api.Assertions.assertThat;
public class ScheduledFutureTaskHackTest {
@Test
public void fastForward() throws InterruptedException {
final AtomicLong nanotimeHolder = new AtomicLong( 0 ) ;
ScheduledFutureTaskHack.install( nanotimeHolder::get ) ;
final long startTime = hackedNanoTime() ;
final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup() ;
final Semaphore scheduledTaskCompleted = new Semaphore( 0 ) ;
nioEventLoopGroup.schedule(
() -> {
scheduledTaskCompleted.release() ;
LOGGER.info( "Scheduled task completed." ) ;
},
1,
TimeUnit.HOURS
) ;
LOGGER.info( "Scheduled task for in 1 hour, now fast-forwarding Netty's clock ..." ) ;
// Test fails when disabling fast-forward below.
nanotimeHolder.set( startTime + TimeUnit.HOURS.toNanos( 1 ) + 1 ) ;
Thread.sleep( 1000 ) ;
hackedNanoTime() ;
// Amazingly Netty detected clock change and ran the task!
assertThat( scheduledTaskCompleted.tryAcquire( 1, TimeUnit.SECONDS ) )
.describedAs( "Scheduled task should have completed within 1 second" )
.isTrue()
;
}
// =======
// Fixture
// =======
private static final Logger LOGGER = LoggerFactory.getLogger(
ScheduledFutureTaskHackTest.class ) ;
static {
NettyTools.forceNettyClassesToLoad() ;
}
private static long hackedNanoTime() {
final long nanoTime = ScheduledFutureTaskHack.invokeNanoTime() ;
LOGGER.info(
ToStringTools.getNiceName( ScheduledFutureTaskHack.StaticMethodDelegate.class ) +
"#nanoTime(): " + nanoTime + "."
) ;
return nanoTime ;
}
}
InstrumentedNioEventLoopGroup.java
package com.otcdlink.chiron.integration.harness;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ScheduledFuture;
import javax.annotation.Nonnull;
import java.time.Instant;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import static com.google.common.base.Preconditions.checkNotNull;
class InstrumentedNioEventLoopGroup extends NioEventLoopGroup {
/**
* Consume the value obtained from
* {@link io.netty.util.concurrent.ScheduledFutureTask#deadlineNanos()}.
* This is hardly mappable to an exact {@link Instant} (even if the Java flavor retains
* nanoseconds) but this is enough to compare with {@link System#nanoTime()}.
*/
private final Consumer< Long > scheduledTaskMomentConsumer ;
public InstrumentedNioEventLoopGroup(
final ThreadFactory threadFactory,
final Consumer< Long > scheduledTaskMomentConsumer
) {
// Need 2 threads because one will block on Socket Selector if there is no IO,
// so we add one to poll Tasks.
super( 2, threadFactory ) ;
this.scheduledTaskMomentConsumer = checkNotNull( scheduledTaskMomentConsumer ) ;
}
private < FUTURE extends Future > FUTURE recordDeadlineNanos( final FUTURE future ) {
final Long deadlineNanos = ScheduledFutureTaskHack.invokeDeadlineNanos( future ) ;
if( deadlineNanos != null ) {
scheduledTaskMomentConsumer.accept( deadlineNanos ) ;
}
return future ;
}
@Nonnull
@Override
public Future< ? > submit( final Runnable task ) {
return recordDeadlineNanos( super.submit( task ) ) ;
}
@Nonnull
@Override
public < T > Future< T > submit(
final Runnable task,
final T result
) {
return recordDeadlineNanos( super.submit( task, result ) ) ;
}
@Nonnull
@Override
public < T > Future< T > submit( final Callable< T > task ) {
return recordDeadlineNanos( super.submit( task ) ) ;
}
@Nonnull
@Override
public ScheduledFuture< ? > schedule(
final Runnable command,
final long delay,
final TimeUnit unit
) {
return recordDeadlineNanos( super.schedule( command, delay, unit ) ) ;
}
@Nonnull
@Override
public < V > ScheduledFuture< V > schedule(
final Callable< V > callable,
final long delay,
final TimeUnit unit
) {
return recordDeadlineNanos( super.schedule( callable, delay, unit ) ) ;
}
@Nonnull
@Override
public ScheduledFuture< ? > scheduleAtFixedRate(
final Runnable command,
final long initialDelay,
final long period,
final TimeUnit unit
) {
return recordDeadlineNanos(
super.scheduleAtFixedRate( command, initialDelay, period, unit ) ) ;
}
@Nonnull
@Override
public ScheduledFuture< ? > scheduleWithFixedDelay(
final Runnable command,
final long initialDelay,
final long delay,
final TimeUnit unit
) {
return recordDeadlineNanos(
super.scheduleWithFixedDelay( command, initialDelay, delay, unit ) ) ;
}
}
InstrumentedNioEventLoopGroupTest.java
package com.otcdlink.chiron.integration.harness;
import com.otcdlink.chiron.toolbox.concurrent.ExecutorTools;
import com.otcdlink.chiron.toolbox.netty.NettyTools;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.ScheduledFuture;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.assertj.core.api.Assertions.assertThat;
public class InstrumentedNioEventLoopGroupTest {
@Test
public void recordAndAdjust() throws InterruptedException {
final int delay = 10 ;
final TimeUnit timeUnit = TimeUnit.SECONDS ;
final AtomicLong nanoInstantSupplier = new AtomicLong() ;
ScheduledFutureTaskHack.install( nanoInstantSupplier::get ) ;
final List< Long > taskDeadlineRecorder = Collections.synchronizedList( new ArrayList<>() ) ;
final InstrumentedNioEventLoopGroup executor = new InstrumentedNioEventLoopGroup(
ExecutorTools.newThreadFactory( "executor" ), taskDeadlineRecorder::add ) ;
executor.setIoRatio( 1 ) ; // Silly but worth trying to see what can get wrong.
final Semaphore doneSemaphore = new Semaphore( 0 ) ;
final ScheduledFuture< ? > scheduledFuture1 =
executor.schedule( ( Runnable ) doneSemaphore::release, delay, timeUnit ) ;
LOGGER.info( "Scheduled " + scheduledFuture1 + "." ) ;
assertThat( taskDeadlineRecorder ).hasSize( 1 ) ;
final Long nanoTime = taskDeadlineRecorder.get( 0 ) - ScheduledFutureTaskHack.START_TIME ;
LOGGER.info( "Recorded " + nanoTime + " as nanoTime deadline for next task." ) ;
assertThat( nanoTime ).isEqualTo( timeUnit.toNanos( delay ) ) ;
final long pastDeadline = nanoTime + 1 ;
nanoInstantSupplier.set( pastDeadline ) ;
LOGGER.info(
"Did set nanoTime to " + pastDeadline + ", past to Task's deadline. " +
"Invocation of hacked nanoTime() returns " +
ScheduledFutureTaskHack.invokeNanoTime() + "."
) ;
LOGGER.info( "Now waiting for task completion ..." ) ;
assertThat( doneSemaphore.tryAcquire( 3, TimeUnit.SECONDS ) ).isTrue() ;
}
/**
* Fails when ran after {@link #recordAndAdjust()} because JUnit doesn't reload classes for
* each method inside a test class.
*/
@Test
public void noInstrumentation() throws InterruptedException {
final NioEventLoopGroup executor =
new NioEventLoopGroup( 1, ExecutorTools.newThreadFactory( "executor" ) ) ;
final Semaphore doneSemaphore = new Semaphore( 0 ) ;
executor.submit( () -> LOGGER.info( "Plain submission works!" ) ) ;
final ScheduledFuture< ? > scheduledFuture =
executor.schedule( ( Runnable ) doneSemaphore::release, 1, TimeUnit.SECONDS ) ;
LOGGER.info( "Scheduled " + scheduledFuture + "." ) ;
assertThat( doneSemaphore.tryAcquire( 3, TimeUnit.SECONDS ) ).isTrue() ;
}
// =======
// Fixture
// =======
private static final Logger LOGGER =
LoggerFactory.getLogger( InstrumentedNioEventLoopGroupTest.class ) ;
static {
NettyTools.forceNettyClassesToLoad() ;
}
}
我是 Chiron Framework 的作者,一个基于 WebSocket 的网络框架,具有纯 Java 客户端和非阻塞双因素身份验证。它经常使用 Netty。遗憾的是,有许多基于 JMockit 的测试不能可靠地运行,因为执行顺序可能是不确定的(这是每段调度任务的代码的问题)。
最佳答案
伙计们,这太明显了:我重写了每个 schedule*
方法,将 Runnable
/Callable
和其他参数堆积在数据中对象,并将其添加到某个队列中。然后我从测试中显式触发任务执行。
因为创建任务的代码是我的,所以我用标记接口(interface)装饰每个任务(恰好是方法引用)。然后测试可以检查它运行预期的任务。
关于java - Instrument Netty 的 EventLoop 用于确定性执行计划任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50811262/
Task.WaitAll 方法等待所有任务,Task.WaitAny 方法等待一个任务。如何等待任意N个任务? 用例:下载搜索结果页面,每个结果都需要一个单独的任务来下载和处理。如果我使用 WaitA
我正在查看一些像这样的遗留 C# 代码: await Task.Run(() => { _logger.LogException(LogLevel.Error, mes
如何在 Linux 中运行 cron 任务? 关注此Q&A ,我有这个 cron 任务要运行 - 只是将一些信息写入 txt 文件, // /var/www/cron.php $myfile = fo
原谅我的新手问题,但我想按顺序执行三个任务并在剧本中使用两个角色: 任务 角色 任务 角色 任务 这是我到目前为止(任务,角色,任务): --- - name: Task Role Task ho
我有一个依赖于 installDist 的自定义任务 - 不仅用于执行,还依赖于 installDist 输出: project.task('run', type: JavaExec, depends
从使用 Wix 创建的 MSI 运行卸载时,我需要在尝试删除任何文件之前强行终止在后台运行的进程。主要应用程序由一个托盘图标组成,它反射(reflect)了 bg 进程监控本地 Windows 服务的
我想编写 Ant 任务来自动执行启动服务器的任务,然后使用我的应用程序的 URL 打开 Internet Explorer。 显然我必须执行 startServer先任务,然后 startApplic
使用 ASP.NET 4.5,我正在尝试使用新的 async/await 玩具。我有一个 IDataReader 实现类,它包装了一个特定于供应商的阅读器(如 SqlDatareader)。我有一个简
使用命令 gradle tasks可以得到一份所有可用任务的报告。有什么方法可以向此命令添加参数并按任务组过滤任务。 我想发出类似 gradle tasks group:Demo 的命令筛选所有任务并
除了sshexec,还有什么办法吗?任务要做到这一点?我知道您可以使用 scp 复制文件任务。但是,我需要执行其他操作,例如检查是否存在某些文件夹,然后将其删除。我想使用类似 condition 的东
假设我有字符串 - "D:\ApEx_Schema\Functions\new.sql@@\main\ONEVIEW_Integration\3" 我需要将以下内容提取到 diff 变量中 - 文档名
我需要编写一个 ant 任务来确定某个文件是否是只读的,如果是,则失败。我想避免使用自定义选择器来为我们的构建系统的性质做这件事。任何人都有任何想法如何去做?我正在使用 ant 1.8 + ant-c
这是一个相当普遍的计算机科学问题,并不特定于任何操作系统或框架。 因此,我对与在线程池上切换任务相关的开销感到有些困惑。在许多情况下,给每个作业分配自己的特定线程是没有意义的(我们不想创建太多硬件线程
我正在使用以下 Ansible playbook 一次性关闭远程 Ubuntu 主机列表: - hosts: my_hosts become: yes remote_user: my_user
如何更改 Ant 中的当前工作目录? Ant documentation没有 任务,在我看来,最好的做法是不要更改当前工作目录。 但让我们假设我们仍然想这样做——你会如何做到这一点?谢谢! 最佳答案
是否可以运行 cronjob每三天一次?或者也许每月 10 次。 最佳答案 每三天运行一次 - 或更短时间在月底运行一次。 (如果上个月有 31 天,它将连续运行 2 天。) 0 0 */3 * *
如何在 Gradle 任务中执行托管在存储库中的工具? 在我的具体情况下,我正在使用 Gradle 构建一个 Android 应用程序。我添加了一项任务,将一些 protobuf 数据从文本编码为二进
我的项目有下一个结构: Root |- A |- C (depends on A) \- B (depends on A) 对于所有子项目,我们使用自己的插件生成资源:https://githu
我设置了一个具有4个节点的Hadoop群集,其中一个充当HDFS的NameNode以及Yarn主节点。该节点也是最强大的。 现在,我分发了2个文本文件,一个在node01(名称节点)上,一个在node
在 TFS 2010 中为多个用户存储任务的最佳方式是什么?我只能为一项任务分配一个。 (例如:当我计划向所有开发人员演示时) (这是一个 Scrum Msf 敏捷项目,其中任务是用户故事的一部分)
我是一名优秀的程序员,十分优秀!