- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个 KStream 应用程序,其中包含一堆 KStream、连接和其他操作。我启用了logging.level.org.springframework.kafka.config=debug
验证正在生成的拓扑,并发现很多根本没有意义的节点。
然后我将应用程序简化为:
interface ShippingKStreamProcessor {
@Input("input")
fun input(): KStream<Int, Customer>
}
@Suppress("UNCHECKED_CAST")
@Configuration
class ShippingKStreamConfiguration {
@StreamListener
fun process(@Input("input") input: KStream<Int, Customer> {}
}
2019-04-30 23:47:03.881 DEBUG 2944 --- [ main] o.s.k.config.StreamsBuilderFactoryBean : Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [customer])
--> KSTREAM-MAPVALUES-0000000001
Processor: KSTREAM-MAPVALUES-0000000001 (stores: [])
--> KSTREAM-BRANCH-0000000003, KSTREAM-PROCESSOR-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-BRANCH-0000000003 (stores: [])
--> KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005
<-- KSTREAM-MAPVALUES-0000000001
Processor: KSTREAM-BRANCHCHILD-0000000004 (stores: [])
--> KSTREAM-MAPVALUES-0000000007
<-- KSTREAM-BRANCH-0000000003
Processor: KSTREAM-BRANCHCHILD-0000000005 (stores: [])
--> KSTREAM-PROCESSOR-0000000006
<-- KSTREAM-BRANCH-0000000003
Processor: KSTREAM-MAPVALUES-0000000007 (stores: [])
--> none
<-- KSTREAM-BRANCHCHILD-0000000004
Processor: KSTREAM-PROCESSOR-0000000002 (stores: [])
--> none
<-- KSTREAM-MAPVALUES-0000000001
Processor: KSTREAM-PROCESSOR-0000000006 (stores: [])
--> none
<-- KSTREAM-BRANCHCHILD-0000000005
fun main(args: Array<String>) {
val builder = StreamsBuilder()
val streamsConfiguration = Properties()
streamsConfiguration[StreamsConfig.APPLICATION_ID_CONFIG] = "kafka-shipping-service"
streamsConfiguration[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "http://localhost:9092"
streamsConfiguration[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
val serdeConfig = mapOf(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://localhost:8081",
AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY to TopicRecordNameStrategy::class.java.name
)
//val byteArraySerde = Serdes.ByteArray()
val intSerde = Serdes.IntegerSerde()
val customerSerde = SpecificAvroSerde<Customer>()
customerSerde.configure(serdeConfig, false)
val customerStream = builder.stream<Int, Customer>("customer",
Consumed.with(intSerde, customerSerde)) as KStream<Int, Customer>
val topology = builder.build()
println(topology.describe())
val streams = KafkaStreams(topology, streamsConfiguration)
streams.start()
}
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [customer])
--> none
最佳答案
@codependent 在拓扑中拥有这些额外处理器的原因是因为您使用的是框架提供的 de/serailzers( native 解码和编码默认为 false
)。基本上,我们从 Kafka 主题接收数据作为 byte[]
然后在内部进行转换。对于这些转换,我们会经过一些额外的处理器,因此您最终会得到更深层次的拓扑。
这是一个基本的StreamListener
在 Java 中(几乎是你所拥有的,但使用更简单的值类型):
@StreamListener
public void process(@Input("input") KStream<Integer, String> input ) {
}
spring.cloud.stream.kafka.streams:
binder.configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.bindings.input.consumer.useNativeDecoding: true
2019-05-01 18:02:12.705 DEBUG 67539 --- [ main] o.s.k.config.StreamsBuilderFactoryBean : Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [hello-1])
--> KSTREAM-MAPVALUES-0000000001
Processor: KSTREAM-MAPVALUES-0000000001 (stores: [])
--> none
<-- KSTREAM-SOURCE-0000000000
Serde
在您的处理器中将无法工作,因为这些序列化程序不兼容。所以这里有一些你的选择。
关于kotlin - Spring Cloud Stream 会生成不必要的复杂 Kafka 拓扑,为什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55929162/
我之前发布过question已得到答复,但我也需要对此进行查询。我有一个包含这样数据的表结构(日期格式为 dd/mm/yyyy)。 ID Account Number Unit Ad
我正在使用 React Native Calendars 并尝试为议程组件构建我的数据。 预期的数据结构是(一个对象) { '2012-05-22': [{text: 'item 1 - any j
这个问题不太可能对任何 future 的访客有帮助;它只与一个较小的地理区域、一个特定的时间点或一个非常狭窄的情况相关,通常不适用于全世界的互联网受众。如需帮助使此问题更广泛适用,visit the
两列城镇和优先级。 我需要对表进行排序,以便优先级=1的城镇排在第一位,并且不按名称 ASC 排序,而其余城镇则按名称 ASC 排序。 我该怎么做? 谢谢;) 更新 SELECT * FROM map
我有三个表“Hardware_model”、“Warehouse”和“Brand”,并且表以这种方式一起引用:Hardware_model 仓库Hardware_model 品牌 现在我要执行以下
我有一个 MySQL 表 (tbl_filters),包含 3 列:id、cat、val id 和 val 是数字,cat 是 varchar。每个 id 有多行。 我还有另一个包含多个列的表 (tb
我想获取字段的不同值,比方说:field1...这需要一个如下查询:“从表中选择不同的(字段1)” 但是,对于某些记录,field1 为空,并且还有另一列可以替代 field1,即 field2。对于
表 1 - 用户 id username items 1 Paul 1(0020);2(0001); 表 2 - 项目 id name 1 name_here 在我的用户的项目中,我输入了 2(000
我想连接同一个表 4 次以获取列的显示方式,我不确定是否可以在 1 个 SQL 语句中完成。 tbl_用户名 id username 1 Adam 2 Bob 3 Chris tbl_机
首先,我刚刚开始自己学习JS,没有任何编程经验,这意味着我仍然要了解这种出色的编程语言的基本构建模块。 我的问题与我编写的以下代码有关: let orderCount = 0; con
关闭。这个问题需要details or clarity .它目前不接受答案。 想改进这个问题吗? 通过 editing this post 添加细节并澄清问题. 关闭 9 年前。 Improve t
我正在使用 XMAPP,MySQL 正在正常运行。在 phpMyAdmin 中,我不太明白这一点,所以我尝试在 PHP 中创建一个。使用此代码,它会告诉我数据库 benutzer。尽管我在 phpMy
是否有一种高效的算法可以找到平均度最大的子图(可能是图本身)? 最佳答案 The paper "Finding a Maximum-Density Subgraph" by Andrew Goldbe
目录 1、业务背景 2、场景分析 3、流程设计 1、业务流程 2、导入流程
我有 2 个表: 1) 包含自 1900 年 1 月 1 日以来所有日期的 Masterdates 表 2) Stockdata 表,其中包含表单中的股票数据 日期、交易品种、开盘价、最高价、最低价、
我有一个非常复杂的 UI,其状态栏不断变化,其中包含多种类型的状态消息,并且 UI 具有复杂的图表控件和已加载的指示性地理 map 。 现在这些小而复杂的区域的数据上下文具有同样复杂的 ViewMod
有人可以用简单的方式向我解释为什么常量在大 O 表示法中无关紧要吗?为什么添加常量时复杂性保持不变。这不是作业问题,我只是想更好地理解这一点。让我明白这个大 O 是为了看到一个函数在接近无穷大时的行为
我在 flex 搜索索引中有以下文档。 [{ "_index": "ten2", "_type": "documents", "_id": "c323c
我有一个以零碎的方式构建的 LINQ 查询,如下所示: var initialQuery = from item in MyContext where xxx == yyy select item;
我目前正在涉足 SQL,并且希望针对我所创建的问题获得一些帮助。 为了练习一些编程,我正在制作一个 IOU 应用程序。下面是我存储的表我的借条记录(忽略一些相关栏目)。该表允许用户说“嘿,你欠我 X
我是一名优秀的程序员,十分优秀!