- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我已经按照 https://kafka.apache.org/24/documentation/streams/developer-guide/testing.html 为 kafka 流应用程序编写了一个测试类
,其代码是
import com.EventSerde;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Properties;
public class KafkaStreamsConfigTest {
private TopologyTestDriver testDriver;
private TestInputTopic<String, Object> inputTopic;
private TestOutputTopic<String, Object> outputTopic;
private Serde<String> stringSerde = new Serdes.StringSerde();
private EventSerde eventSerde= new EventSerde();
private String key="test";
private Object value = "some value";
private Object expected_value = "real value";
String kafkaEventSourceTopic = "raw_events";
String kafkaEventSinkTopic = "processed_events";
String kafkaCacheSinkTopic = "cache_objects";
String applicationId = "my-app";
String test_dummy = "dummy:1234";
@Before
public void setup() {
Topology topology = new Topology();
topology.addSource(kafkaEventSourceTopic, kafkaEventSourceTopic);
topology.addProcessor(ProcessRouter.class.getSimpleName(), ProcessRouter::new, kafkaEventSourceTopic);
topology.addProcessor(WorkforceVisit.class.getSimpleName(), WorkforceVisit::new
, ProcessRouter.class.getSimpleName());
topology.addProcessor(DefaultProcessor.class.getSimpleName(), DefaultProcessor::new
, ProcessRouter.class.getSimpleName());
topology.addProcessor(CacheWorkforceShift.class.getSimpleName(), CacheWorkforceShift::new
, ProcessRouter.class.getSimpleName());
topology.addProcessor(DigitalcareShiftassisstantTracking.class.getSimpleName(), DigitalcareShiftassisstantTracking::new
, ProcessRouter.class.getSimpleName());
topology.addProcessor(WorkforceLocationUpdate.class.getSimpleName(), WorkforceLocationUpdate::new
, ProcessRouter.class.getSimpleName());
topology.addSink(kafkaEventSinkTopic, kafkaEventSinkTopic
, WorkforceVisit.class.getSimpleName(), DefaultProcessor.class.getSimpleName()
, CacheWorkforceShift.class.getSimpleName(), DigitalcareShiftassisstantTracking.class.getSimpleName()
, WorkforceLocationUpdate.class.getSimpleName());
topology.addSink(kafkaCacheSinkTopic, kafkaCacheSinkTopic
, WorkforceVisit.class.getSimpleName()
, CacheWorkforceShift.class.getSimpleName(), DigitalcareShiftassisstantTracking.class.getSimpleName()
, WorkforceLocationUpdate.class.getSimpleName());
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, test_dummy);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, EventSerde.class.getName());
testDriver = new TopologyTestDriver(topology, properties);
//setup test topics
inputTopic = testDriver.createInputTopic(kafkaEventSourceTopic, stringSerde.serializer(), eventSerde.serializer());
outputTopic = testDriver.createOutputTopic(kafkaEventSinkTopic, stringSerde.deserializer(), eventSerde.deserializer());
}
@After
public void tearDown() {
testDriver.close();
}
@Test
public void outputEqualsTrue()
{
inputTopic.pipeInput(key, value);
Object b = outputTopic.readValue();
System.out.println(b.toString());
assertEquals(b,expected_value);
}
EventSerde
类来序列化和反序列化值。
java.util.NoSuchElementException: Uninitialized topic: processed_events
使用以下堆栈跟踪:
java.util.NoSuchElementException: Uninitialized topic: processed_events
at org.apache.kafka.streams.TopologyTestDriver.readRecord(TopologyTestDriver.java:715)
at org.apache.kafka.streams.TestOutputTopic.readRecord(TestOutputTopic.java:100)
at org.apache.kafka.streams.TestOutputTopic.readValue(TestOutputTopic.java:80)
at com.uhx.platform.eventprocessor.config.KafkaStreamsConfigTest.outputEqualsTrue(KafkaStreamsConfigTest.java:111)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
outputTopic.readValue();
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.4.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>2.4.0</version>
<scope>test</scope>
</dependency>
最佳答案
为了避免/克服此异常,您需要在尝试从中读取之前检查您的输出主题是否为空。
@Test
public void outputEqualsTrue()
{
inputTopic.pipeInput(key, value);
assert(outputTopic.isEmpty(), false);
Object b = outputTopic.readValue();
System.out.println(b.toString());
assertEquals(b,expected_value);
}
关于unit-testing - Kafka Streams 测试 : java. util.NoSuchElementException:未初始化的主题: "output_topic_name",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59987758/
我是 Spring 新手,这就是我想要做的事情: 我正在使用一个基于 Maven 的库,它有自己的 Spring 上下文和 Autowiring 字段。 它的bean配置文件是src/test/res
我在我的测试脚本中有以下列表初始化: newSequenceCore=["ls", "ns", "*", "cm", "*", "ov", "ov", "ov", "ov", "kd"] (代表要在控
这个问题在这里已经有了答案: 关闭 11 年前。 Possible Duplicate: Class construction with initial values 当我查看 http://en.
我得到了成员变量“objectCount”的限定错误。编译器还返回“ISO C++ 禁止非常量静态成员的类内初始化”。这是主类: #include #include "Tree.h" using n
我有如下所示的a.h class A { public: void doSomething()=0; }; 然后我有如下所示的b.h #include "a.h" class b: publi
我需要解析 Firebase DataSnapshot (一个 JSON 对象)转换成一个数据类,其属性包括 enum 和 list。所以我更喜欢通过传递 DataSnapshot 来手动解析它进入二
我使用 JQuery 一段时间了,我总是使用以下代码来初始化我的 javascript: $(document).ready( function() { // Initalisation logic
这里是 Objective-C 菜鸟。 为什么会这样: NSString *myString = [NSString alloc]; [myString initWithFormat:@"%f", s
我无法让核心数据支持的 NSArrayController 在我的代码中正常工作。下面是我的代码: pageArrayController = [[NSArrayController alloc] i
我对这一切都很陌生,并且无法将其安装到我的后端代码中。它去哪里?在我的页脚下面有我所有的 JS? 比如,这是什么意思: Popup initialization code should be exec
这可能是一个简单的问题,但是嘿,我是初学者。 所以我创建了一个程序来计算一些东西,它目前正在控制台中运行。我决定向其中添加一个用户界面,因此我使用 NetBeans IDE 中的内置功能创建了一个 J
我有 2 个 Controller ,TEST1Controller 和 TEST2Controller 在TEST2Controller中,我有一个initialize()函数设置属性值。 如果我尝
据我所知, dependentObservable 在声明时会进行计算。但如果某些值尚不存在怎么办? 例如: var viewModel ={}; var dependentObservable1 =
我正在阅读 POODR 这本书,它使用旧语法进行默认值初始化。我想用新语法实现相同的功能。 class Gear attr_reader :chainring, :cog, :wheel de
我按照 polymer 教程的说明进行操作: https://www.polymer-project.org/3.0/start/install-3-0 (我跳过了可选部分) 但是,在我执行命令“po
很抱歉问到一个非常新手的Kotlin问题,但是我正在努力理解与构造函数和初始化有关的一些东西。 我有这个类和构造函数: class TestCaseBuilder constructor(
假设我们有一个包含 30 列和 30 行的网格。 生命游戏规则简而言之: 一个小区有八个相邻小区 当一个细胞拥有三个存活的相邻细胞时,该细胞就会存活 如果一个细胞恰好有两个或三个活的相邻细胞,那么它就
我是 MQTT 和 Android 开放附件“AOA” 的新手。在阅读教程时,我意识到,在尝试写入 ByteArrayOutputStream 类型的变量之前,应该写入 0 或 0x00首先到该变量。
我有 2 个 Controller ,TEST1Controller 和 TEST2Controller 在TEST2Controller中,我有一个initialize()函数设置属性值。 如果我尝
我有一个inotify /内核问题。我正在使用“inotify” Python项目进行观察,但是,我的问题仍然是固有的关于inotify内核实现的核心。 Python inotify项目处理递归ino
我是一名优秀的程序员,十分优秀!