- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我需要对 kafka 应用程序执行单元测试,避免使用第三方库。
我现在的问题是我想在测试之间清除所有主题,但我不知道如何。
这是我的临时解决方案:提交每次测试后产生的每条消息,并将所有测试消费者放在同一个消费者组中。
override protected def afterEach():Unit={
val cleanerConsumer= newConsumer(Seq.empty)
val topics=cleanerConsumer.listTopics()
println("pulisco")
cleanerConsumer.subscribe(topics.keySet())
cleanerConsumer.poll(100)
cleanerConsumer.commitSync()
cleanerConsumer.close()
}
虽然这不起作用,但我不知道为什么。
例如,当我在测试中创建一个新的消费者时,messages
包含在之前的测试中产生的消息。
val consumerProbe = newConsumer(SMSGatewayTopic)
val messages = consumerProbe.poll(1000)
我该如何解决这个问题?
最佳答案
您还可以在测试源中嵌入 Kafka/Zookeeper 实例,以更好地控制此类隔离服务。
trait Kafka { self: ZooKeeper =>
Kafka.start()
}
object Kafka {
import org.apache.hadoop.fs.FileUtil
import kafka.server.KafkaServer
@volatile private var started = false
lazy val logDir = java.nio.file.Files.createTempDirectory("kafka-log").toFile
lazy val kafkaServer: KafkaServer = {
val config = com.typesafe.config.ConfigFactory.
load(this.getClass.getClassLoader)
val (host, port) = {
val (h, p) = config.getString("kafka.servers").span(_ != ':')
h -> p.drop(1).toInt
}
val serverConf = new kafka.server.KafkaConfig({
val props = new java.util.Properties()
props.put("port", port.toString)
props.put("broker.id", port.toString)
props.put("log.dir", logDir.getAbsolutePath)
props.put(
"zookeeper.connect",
s"localhost:${config getInt "test.zookeeper.port"}"
)
props
})
new KafkaServer(serverConf)
}
def start(): Unit = if (!started) {
try {
kafkaServer.startup()
started = true
} catch {
case err: Throwable =>
println(s"fails to start Kafka: ${err.getMessage}")
throw err
}
}
def stop(): Unit = try {
if (started) kafkaServer.shutdown()
} finally {
FileUtil.fullyDelete(logDir)
}
}
trait ZooKeeper {
ZooKeeper.start()
}
object ZooKeeper {
import java.nio.file.Files
import java.net.InetSocketAddress
import org.apache.hadoop.fs.FileUtil
import org.apache.zookeeper.server.ZooKeeperServer
import org.apache.zookeeper.server.ServerCnxnFactory
@volatile private var started = false
lazy val logDir = Files.createTempDirectory("zk-log").toFile
lazy val snapshotDir = Files.createTempDirectory("zk-snapshots").toFile
lazy val (zkServer, zkFactory) = {
val srv = new ZooKeeperServer(
snapshotDir, logDir, 500
)
val config = com.typesafe.config.ConfigFactory.
load(this.getClass.getClassLoader)
val port = config.getInt("test.zookeeper.port")
srv -> ServerCnxnFactory.createFactory(
new InetSocketAddress("localhost", port), 1024
)
}
def start(): Unit = if (!zkServer.isRunning) {
try {
zkFactory.startup(zkServer)
started = true
while (!zkServer.isRunning) {
Thread.sleep(500)
}
} catch {
case err: Throwable =>
println(s"fails to start ZooKeeper: ${err.getMessage}")
throw err
}
}
def stop(): Unit = try {
if (started) zkFactory.shutdown()
} finally {
try { FileUtil.fullyDelete(logDir) } catch { case _: Throwable => () }
FileUtil.fullyDelete(snapshotDir)
}
}
测试类可以使用 ZooKeeper 扩展 Kafka
以确保它可用。
如果测试 JVM 没有 fork ,可以使用 SBT testOptions in Test
设置中的 Tests.Cleanup
来停止测试后的嵌入式服务。
关于java - 清除单元测试的kafka主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38851171/
我有一个网站,我正在通过学校参加比赛,但我在清除 float 元素方面遇到了问题。 该网站托管在 http://www.serbinprinting.com/corey/development/
我有一个清除按钮,需要使用 JQuery 函数清除该按钮单击时的 TextBox 值(输入的)。 最佳答案 您只需将单击事件附加到按钮即可将输入元素的值设置为空。 $("#clearButton").
我们已经创建了一个保存到 CoreData 然后同步到 CloudKit 的 iOS 应用程序。在测试中,我们还没有找到一种方法来清除应用程序 iCloud 容器中的数据(用于用户私有(private
这是一个普遍的问题,也是我突然想到并且似乎有道理的问题。我看到很多人使用清除div 并且知道这有时不受欢迎,因为它是额外的标记。我最近开始使用 因为它接缝代表了它的实际用途。 当然都引用了:.clea
我有两个单选按钮。如果我检查第一个单选按钮下面的数据将填充在组合框中。之后我将检查另一个单选按钮,我想清除组合框值。 EmployeeTypes _ET = new EmployeeTypes(
我一直在玩 Canvas ,我正在尝试制作一个可以移动和跳跃的正方形,移动部分已经完成,但是跳跃部分有一个问题:每次跳跃时它都会跳得更快 here's a jsfiddle 这是代码: ///////
我该如何在 Dart 上做到这一点? 抓取tbody元素后,我想在其上调用empty(),但这似乎不存在: var el = query('#search_results_tbody'); el.em
我需要创建一个二维模拟,但是在设置新的“框架”时,旧的“框架”不会被清除。 我希望一些圆圈在竞技场中移动,并且每个循环都应删除旧圆圈并生成新圆圈。一切正常,但旧的没有被清除并且仍然可见,这就是我需要改
无论我使用set statusline将状态行更改为什么,我的状态行都不会改变。看起来像 ".vimrc" 39L, 578C
在 WPF 应用程序中,我有一个 ListView 绑定(bind)到我的 ViewModel 上的一个 ObservableCollection。 在应用程序运行期间,我需要删除并重新加载集合中的所
我有一个大型程序,一个带有图形的文本扭曲游戏。在我的代码中的某处,我使用 kbhit() 我执行此代码来清除我的输入缓冲区: while ((c = getchar()) != '\n' && c !
我正在将所有网站的页面加载到主索引页面中,并通过将 href 分成段并在主域名后使用 .hash 函数添加段来更新 URL 显示,如下所示: $('a').click(function(event)
我有一个带有 的表单和 2 控件来保存和重置表单。我正在触发 使用 javascript __doPostBack()函数并在其中传递一个值 __EVENTARGUMENT如果面板应该重置。 我的代
我目前有一堆 UIViewController,每个都是在前一个之上呈现的模式 ViewController。我的问题是我不需要一堆 UIViewController,我只需要最后一个。因此,当出现新
我在一个类中有一些属性方法,我想在某个时候清除这个属性的缓存。 示例: class Test(): def __init__(self): pass @property
在此Test Link我试图将标题和主站点导航安装到博客脚本的顶部。 我清除:两者;在主要网站脚本上工作,但现在把所有东西都扔到了一边。尝试了无数次 fixex 都没有成功!提前感谢 Ant 指点解决
我似乎无法正确清除布局。看this 我无法阻止左栏中的元素向下推右栏中的元素。谁能帮忙? Screenshot with some pointy arrows (死链接) 最佳答案 问题标记/样式似
我希望能够在某个类 (sprite-empos) 之后清除 '' 中的内容,想知道是否有不添加任何新类或不使用 js 的方法(我在下面尝试过不工作)? 为了明确它是“985”,我想在某个视口(view
我想清除ptr_array boost::ptr_array a; ... a.clear(); // missing 如何清理 ptr 容器? 最佳答案 它应该表现得像一个数组,您不能在 C++
这是我使用多 map 制作的一个简单的事件系统;当我使用 CEvents::Add(..) 方法时,它应该插入并进入多重映射。问题是,当我触发这些事件时, multimap 似乎是空的。我确定我没有调
我是一名优秀的程序员,十分优秀!