- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用 Spring 云流,并且想稍微摆弄一下 KStreams/KTables。
我正在寻找从标准 Kafka 主题将其转变为流的方法。
我已经在 KSQL 中完成了此操作,但我试图弄清楚是否有办法让 SpringBoot 处理此问题。我能找到的最好的例子是 @Input
和@Output
channel 已经 KStreams
但我认为这不是我想要的。
在 SpringBoot 内部我正在执行以下操作:
force-entities-topic
[UTC]
从时间消息中标记并重新发布: force-entities-topic-clean
从那里我希望得到它的输出并构建 KStream
和KTable
键入platformUID
字段。
所以我正在使用的数据是:
{
"platformUID": "UID",
"type": "TLPS",
"state": "PLATFORM_INITIALIZED",
"fuelremaining": 5.9722E+24,
"latitude": 39,
"longitude": -115,
"altitude": 0,
"time": "2018-07-18T00:00:00Z[UTC]"
}
我可以运行这些 KSQL 命令来创建我需要的内容。 (这里我以字符串形式读取时间,而不是我在 java/kotlin 实现中执行的实际时间)
CREATE STREAM force_no_key (
platformUID string,
type string,
state string,
fuelremaining DOUBLE,
latitude DOUBLE,
longitude DOUBLE,
altitude DOUBLE
) with (
kafka_topic='force-entities-topic',
value_format='json');
从那里我创建另一个流(因为我无法让它正确读取 key )
CREATE STREAM force_with_key
WITH (KAFKA_TOPIC='blue_force_with_key') AS
select PLATFORMUID as UID, LATITUDE as lat, LONGITUDE as LON, ALTITUDE as ALT, state, type
FROM force_no_key
PARTITION BY UID;
从现在开始
CREATE TABLE FORCE_TABLE
( UID VARCHAR,
LAT DOUBLE,
LON DOUBLE,
ALT DOUBLE
) WITH (KAFKA_TOPIC = 'force_with_key',
VALUE_FORMAT='JSON',
KEY = 'UID');
我认为我遇到麻烦的地方就在这里。我在这里定义我的绑定(bind)接口(interface):
interface ForceStreams {
companion object {
// From the settings file we configure it with the value of-force-in
const val DIRTY_INPUT = "dirty-force-in"
const val CLEANED_OUTPUT = "clean-force-out"
const val CLEANED_INPUT = "clean-force-in"
const val STREAM_OUT = "stream-out"
}
@Input(DIRTY_INPUT)
fun initialInput(): MessageChannel
@Output(CLEANED_OUTPUT)
fun cleanOutput(): SubscribableChannel
@Input(CLEANED_INPUT)
fun cleanInput(): MessageChannel
@Output(STREAM_OUT)
fun cleanedBlueForceMessage(): KStream<String, ForceEntity>
@Output(TABLE_OUT)
fun tableOutput(): KTable<String, ForceEntity>
}
然后我用这个 block 进行清洁:
@StreamListener(ForceStreams.DIRTY_INPUT)
@SendTo(ForceStreams.CLEANED_OUTPUT)
fun forceTimeCleaner(@Payload message: String): ForceEntity {
var inputMap: Map<String, Any> = objectMapper.readValue(message)
var map = inputMap.toMutableMap()
map["type"] = map["type"].toString().replace("-", "_")
map["time"] = map["time"].toString().replace("[UTC]", "")
val json = objectMapper.writeValueAsString(map)
val fe : ForceEntity = objectMapper.readValue(json, ForceEntity::class.java)
return fe
}
但我要从 MessageChannel
出发至SubscribableChannel
我不确定该怎么做是从 SubscribableChannel
开始到 KStream<String,ForceEntity>
或KTable<String,ForceEntity>
任何帮助将不胜感激 - 谢谢
server:
port: 8888
spring:
application:
name: Blue-Force-Table
kafka:
bootstrap-servers: # This seems to be for the KStreams the other config is for normal streams
- localhost:19092
cloud:
stream:
defaultBinder: kafka
kafka:
binder:
brokers: localhost:19092
bindings:
dirty-force-in:
destination: force-entities-topic
contentType: application/json
clean-force-in:
destination: force-entities-topic-clean
contentType: application/json
clean-force-out:
destination: force-entities-topic-clean
contentType: application/json
stream-out:
destination: force_stream
contentType: application/json
table-out:
destination: force_table
contentType: application/json
我想接下来的问题是 - 这可能吗?您可以在单个函数中混合和匹配 Binder 吗?
最佳答案
在第一个 StreamListener
中,您通过 DIRTY_INPUT
绑定(bind)接收数据,并通过绑定(bind) CLEANED_OUTPUT
写入数据。然后,您需要另一个 StreamListener
,您可以在其中以 KStream
形式接收数据并进行处理并写入输出。
第一个处理器:
@StreamListener(ForceStreams.DIRTY_INPUT)
@SendTo(ForceStreams.CLEANED_OUTPUT)
fun forceTimeCleaner(@Payload message: String): ForceEntity {
....
将以下内容更改为 KStream
绑定(bind)。
@Input(CLEANED_INPUT)
fun cleanInput(): MessageChannel
至
@Input(CLEANED_INPUT)
fun cleanInput(): KStream<String, ForceEntity>
第二个处理器:
@StreamListener(CLEANED_INPUT)
@SendTo(STREAM_OUT)
public KStream<String, ForceEntity> process(
KStream<String, ForceEntity> forceEntityStream) {
return forceEntityStream
........
.toStream();
}
目前,Spring Cloud Stream 中的 Kafka Streams 绑定(bind)器不支持将数据作为 KTable
写出。输出上仅允许 KStream
对象(输入上允许 KTable
绑定(bind))。如果这是一个硬性要求,您需要研究 Spring Kafka,您可以在其中进入较低级别并执行此类出站操作。
希望有帮助。
关于java - 如何从 SubscriableChannel 构建 KStream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57331020/
我是一名优秀的程序员,十分优秀!