gpt4 book ai didi

java - 如何从 SubscriableChannel 构建 KStream

转载 作者:行者123 更新时间:2023-12-02 02:10:24 25 4
gpt4 key购买 nike

我正在使用 Spring 云流,并且想稍微摆弄一下 KStreams/KTables。

我正在寻找从标准 Kafka 主题将其转变为流的方法。

我已经在 KSQL 中完成了此操作,但我试图弄清楚是否有办法让 SpringBoot 处理此问题。我能找到的最好的例子是 @Input@Output channel 已经 KStreams但我认为这不是我想要的。

卡夫卡设置

在 SpringBoot 内部我正在执行以下操作:

  • 我的数据来自:force-entities-topic
  • 然后我“清理”数据,删除 [UTC]从时间消息中标记并重新发布: force-entities-topic-clean

从那里我希望得到它的输出并构建 KStreamKTable键入platformUID字段。

输入数据

所以我正在使用的数据是:

{
"platformUID": "UID",
"type": "TLPS",
"state": "PLATFORM_INITIALIZED",
"fuelremaining": 5.9722E+24,
"latitude": 39,
"longitude": -115,
"altitude": 0,
"time": "2018-07-18T00:00:00Z[UTC]"
}

KSQL

我可以运行这些 KSQL 命令来创建我需要的内容。 (这里我以字符串形式读取时间,而不是我在 java/kotlin 实现中执行的实际时间)

CREATE STREAM force_no_key (
platformUID string,
type string,
state string,
fuelremaining DOUBLE,
latitude DOUBLE,
longitude DOUBLE,
altitude DOUBLE
) with (
kafka_topic='force-entities-topic',
value_format='json');

从那里我创建另一个流(因为我无法让它正确读取 key )

CREATE STREAM force_with_key 
WITH (KAFKA_TOPIC='blue_force_with_key') AS
select PLATFORMUID as UID, LATITUDE as lat, LONGITUDE as LON, ALTITUDE as ALT, state, type
FROM force_no_key
PARTITION BY UID;

从现在开始

CREATE TABLE FORCE_TABLE
( UID VARCHAR,
LAT DOUBLE,
LON DOUBLE,
ALT DOUBLE
) WITH (KAFKA_TOPIC = 'force_with_key',
VALUE_FORMAT='JSON',
KEY = 'UID');

Java 风格!

我认为我遇到麻烦的地方就在这里。我在这里定义我的绑定(bind)接口(interface):



interface ForceStreams {

companion object {
// From the settings file we configure it with the value of-force-in
const val DIRTY_INPUT = "dirty-force-in"
const val CLEANED_OUTPUT = "clean-force-out"
const val CLEANED_INPUT = "clean-force-in"
const val STREAM_OUT = "stream-out"
}

@Input(DIRTY_INPUT)
fun initialInput(): MessageChannel

@Output(CLEANED_OUTPUT)
fun cleanOutput(): SubscribableChannel

@Input(CLEANED_INPUT)
fun cleanInput(): MessageChannel

@Output(STREAM_OUT)
fun cleanedBlueForceMessage(): KStream<String, ForceEntity>

@Output(TABLE_OUT)
fun tableOutput(): KTable<String, ForceEntity>
}

然后我用这个 block 进行清洁:

@StreamListener(ForceStreams.DIRTY_INPUT)
@SendTo(ForceStreams.CLEANED_OUTPUT)
fun forceTimeCleaner(@Payload message: String): ForceEntity {

var inputMap: Map<String, Any> = objectMapper.readValue(message)

var map = inputMap.toMutableMap()

map["type"] = map["type"].toString().replace("-", "_")
map["time"] = map["time"].toString().replace("[UTC]", "")

val json = objectMapper.writeValueAsString(map)

val fe : ForceEntity = objectMapper.readValue(json, ForceEntity::class.java)

return fe
}

但我要从 MessageChannel 出发至SubscribableChannel

我不确定该怎么做是从 SubscribableChannel 开始到 KStream<String,ForceEntity>KTable<String,ForceEntity>

任何帮助将不胜感激 - 谢谢

编辑 - applicaiton.yml

server:
port: 8888
spring:
application:
name: Blue-Force-Table
kafka:
bootstrap-servers: # This seems to be for the KStreams the other config is for normal streams
- localhost:19092
cloud:
stream:
defaultBinder: kafka
kafka:
binder:
brokers: localhost:19092
bindings:
dirty-force-in:
destination: force-entities-topic
contentType: application/json
clean-force-in:
destination: force-entities-topic-clean
contentType: application/json
clean-force-out:
destination: force-entities-topic-clean
contentType: application/json
stream-out:
destination: force_stream
contentType: application/json
table-out:
destination: force_table
contentType: application/json

我想接下来的问题是 - 这可能吗?您可以在单个函数中混合和匹配 Binder 吗?

最佳答案

在第一个 StreamListener 中,您通过 DIRTY_INPUT 绑定(bind)接收数据,并通过绑定(bind) CLEANED_OUTPUT 写入数据。然后,您需要另一个 StreamListener,您可以在其中以 KStream 形式接收数据并进行处理并写入输出。

第一个处理器:

@StreamListener(ForceStreams.DIRTY_INPUT)
@SendTo(ForceStreams.CLEANED_OUTPUT)
fun forceTimeCleaner(@Payload message: String): ForceEntity {
....

将以下内容更改为 KStream 绑定(bind)。

@Input(CLEANED_INPUT)
fun cleanInput(): MessageChannel

@Input(CLEANED_INPUT)
fun cleanInput(): KStream<String, ForceEntity>

第二个处理器:

        @StreamListener(CLEANED_INPUT)
@SendTo(STREAM_OUT)
public KStream<String, ForceEntity> process(
KStream<String, ForceEntity> forceEntityStream) {

return forceEntityStream
........
.toStream();
}

目前,Spring Cloud Stream 中的 Kafka Streams 绑定(bind)器不支持将数据作为 KTable 写出。输出上仅允许 KStream 对象(输入上允许 KTable 绑定(bind))。如果这是一个硬性要求,您需要研究 Spring Kafka,您可以在其中进入较低级别并执行此类出站操作。

希望有帮助。

关于java - 如何从 SubscriableChannel 构建 KStream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57331020/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com