gpt4 book ai didi

java - KSQL : Could I use threads in KSQL UDF functions to speed up the process?

转载 作者:行者123 更新时间:2023-11-30 05:41:14 24 4
gpt4 key购买 nike

我在 3 个节点中运行独立的 ksql-server,与 3 个节点的 Kafka 集群进行通信。从 Topic 创建了一个包含 15 个分区的 Stream,数据位于 Stream 中以进行一些丰富。获取一段代码作为 UDF 来查找 IP2Location.bin 文件,UDF 类如下所示:

import java.io.IOException;
import java.util.Map;

import com.google.gson.Gson;

import io.confluent.common.Configurable;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;

@UdfDescription(name = "Ip2Lookup", description = "Lookup class for IP2Location database.")
public class Ip2Lookup implements Configurable {

private IP2Location loc = null;
private Gson gson = null;

@Udf(description = "fetches the geoloc of the ipaddress.")
public synchronized String ip2lookup(String ip) {

String json = null;
if (loc != null) {
IP2LocationResult result = null;
try {
result = loc.query(ip);
System.out.println(result);
json = gson.toJson(result);
} catch (IOException e) {
e.printStackTrace();
}
return json;
}
return ip;
}

@Override
public void configure(Map<String, ?> arg0) {

try {
String db_path = null;
String os = System.getProperty("os.name").toLowerCase();

db_path = "/data/md0/ip2loc/ipv4-bin/IP-COUNTRY-REGION-CITY-LATITUDE-LONGITUDE-ZIPCODE-TIMEZONE-ISP-DOMAIN-NETSPEED-AREACODE-WEATHER-MOBILE-ELEVATION-USAGETYPE.BIN";

loc = new IP2Location(db_path);
gson = new Gson();
} catch (IOException e) {
e.printStackTrace();
}
}
}

数据进入主题的速度非常快(可能是每秒一百万条记录)。在该方法上使用 synchronized 后,每个 ksql-server 节点中的速度为每秒 3000 条记录/消息。以这个速度,你知道,要追上这个速度需要时间。如果没有同步方法,我会看到损坏的数据,因为单个对象/方法被多个线程使用。

问题1:KSQL 到底如何调用udf 调用?

问题2:我可以使用线程处理udf中的请求吗?

问题3:由于主题/流有 15 个分区,我应该启动 15 个 ksql-servers 节点吗?

谢谢。

最佳答案

Question1: How exactly the udf call would be called/invoked by KSQL?

不知道你的意思。一旦您的 UDF 可用于 KSQL(请参阅 https://docs.confluent.io/current/ksql/docs/developer-guide/udf.html#deploying ),您就可以在 KSQL 语句中将 UDF 调用为 IP2LOOKUP。您还可以在 KSQL 中运行 SHOW FUNCTIONS 来确认您的 UDF 可供使用。

也许您问这个是因为您的下一个问题? KSQL 将一次调用您的 UDF 一条消息。

Question2: Could I use threads handling the requests in udf?

你为什么要这么做?您是否担心 KSQL 使用当前的 UDF 代码无法处理传入的数据量?说到这里,您尝试处理的预期数据量是多少,因为您可能正在尝试进行过早优化?

此外,在不了解更多细节的情况下,我认为 UDF 的多线程设置不会产生任何优势,因为 UDF 在调用时仍然一次只能处理一条消息(每个 KSQL 服务器或,更准确地说,每个流任务,其中每个 KSQL 服务器可以有多个任务;我提到这一点是为了清楚地表明,KSQL 中的 UDF 不会因为在所有服务器上只处理一条消息而成为您的处理瓶颈;处理当然是分布式的并且并行发生)。

Question3: Being the Topic/Stream is of 15 partitions, should I spin-up 15 nodes of ksql-servers?

这取决于您的数据量。您可以根据需要运行任意数量的 KSQL 服务器。如果数据量较小,单个 KSQL 服务器可能就足够了。如果数据量较大,您可以开始启动额外的 KSQL 服务器,最多可达 15 个服务器(因为输入主题有 15 个分区)。任何额外的 KSQL 服务器都将处于空闲状态。

如果 15 个 KSQL 服务器不够,您应该将输入主题的分区数量从 15 个增加到更高的数量,然后您还可以启动更多 KSQL 服务器(从而增加计算能力)您的设置)。

关于java - KSQL : Could I use threads in KSQL UDF functions to speed up the process?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55616464/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com