gpt4 book ai didi

java - Kafka Streams 表转换

转载 作者:行者123 更新时间:2023-12-01 06:05:12 26 4
gpt4 key购买 nike

我在 SQL Server 中有一个表,我想将其流式传输到 Kafka 主题,结构如下:

(UserID, ReportID)

该表将不断更改(添加记录、插入记录、无更新)

我想将其转换为这种结构并放入 Elasticsearch 中:

{
"UserID": 1,
"Reports": [1, 2, 3, 4, 5, 6]
}

到目前为止,我看到的示例是日志或点击流,但在我的情况下不起作用。

这种用例是否可能?我总是可以只看 UserID更改并查询数据库,但这似乎很幼稚,而且不是最好的方法。

更新

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.util.ArrayList;
import java.util.Properties;

public class MyDemo {
public static void main(String... args) {
System.out.println("Hello KTable!");

final Serde<Long> longSerde = Serdes.Long();

KStreamBuilder builder = new KStreamBuilder();

KStream<Long, Long> reportPermission = builder.stream(TOPIC);

KTable<Long, ArrayList<Long>> result = reportPermission
.groupByKey()
.aggregate(
new Initializer<ArrayList<Long>>() {
@Override
public ArrayList<Long> apply() {
return null;
}
},
new Aggregator<Long, Long, ArrayList<Long>>() {
@Override
public ArrayList<Long> apply(Long key, Long value, ArrayList<Long> aggregate) {
aggregate.add(value);
return aggregate;
}
},
new Serde<ArrayList<Long>>() {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {}

@Override
public void close() {}

@Override
public Serializer<ArrayList<Long>> serializer() {
return null;
}

@Override
public Deserializer<ArrayList<Long>> deserializer() {
return null;
}
});

result.to("report-aggregated-topic");

KafkaStreams streams = new KafkaStreams(builder, createStreamProperties());
streams.cleanUp();
streams.start();

Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}

private static final String TOPIC = "report-permission";

private static final Properties createStreamProperties() {
Properties props = new Properties();

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "report-permission-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");

return props;
}
}

我实际上陷入了聚合阶段,因为我无法为 ArrayList<Long> 编写正确的 SerDe (还没有足够的技能),lambda 似乎不适用于聚合器 - 它不知道 agg 的类型是什么。 :

KTable<Long, ArrayList<Long>> sample = builder.stream(TOPIC)
.groupByKey()
.aggregate(
() -> new ArrayList<Long>(),
(key, val, agg) -> agg.add(val),
longSerde
);

最佳答案

您可以使用 Kafka 的 Connect API 将数据从 SQL Server 获取到 Kafka。我不知道 SQL Server 的任何特定连接器,但您可以使用任何基于通用 JDBC 的连接器:https://www.confluent.io/product/connectors/

要处理数据,您可以使用 Kafka 的 Streams API。您只需aggregate()每个用户的所有报告即可。像这样的事情:

KTable<UserId, List<Reports>> result =
builder.stream("topic-name")
.groupByKey()
// init a new empty list and
// `add` the items to the list in the actual aggregation
.aggregate(...);

result.to("result-topic");

查看文档以了解有关 Streams API 的更多详细信息:https://docs.confluent.io/current/streams/index.html

Note, that you need to make sure that the list of reports does not grow unbounded. Kafka has some (configurable) maximum message size and the whole list will be contained in a single message. Thus, you should estimate the maximum message size and apply the corresponding configuration (-> max.message.bytes) before going into production. Check out configs at the webpage: http://kafka.apache.org/documentation/#brokerconfigs

最后,您使用 Connect API 将数据推送到 Elastic Search。有多种不同的连接器可供使用(我当然会推荐 Confluence 的连接器)。有关 Connect API 的更多详细信息:https://docs.confluent.io/current/connect/userguide.html

关于java - Kafka Streams 表转换,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46303973/

26 4 0