- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我在 3 个节点中运行独立的 ksql-server
,与 3 个节点的 Kafka
集群进行通信。从 Topic
创建了一个包含 15 个分区的 Stream
,数据位于 Stream 中以进行一些丰富。获取一段代码作为 UDF
来查找 IP2Location.bin 文件,UDF
类如下所示:
import java.io.IOException;
import java.util.Map;
import com.google.gson.Gson;
import io.confluent.common.Configurable;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
@UdfDescription(name = "Ip2Lookup", description = "Lookup class for IP2Location database.")
public class Ip2Lookup implements Configurable {
private IP2Location loc = null;
private Gson gson = null;
@Udf(description = "fetches the geoloc of the ipaddress.")
public synchronized String ip2lookup(String ip) {
String json = null;
if (loc != null) {
IP2LocationResult result = null;
try {
result = loc.query(ip);
System.out.println(result);
json = gson.toJson(result);
} catch (IOException e) {
e.printStackTrace();
}
return json;
}
return ip;
}
@Override
public void configure(Map<String, ?> arg0) {
try {
String db_path = null;
String os = System.getProperty("os.name").toLowerCase();
db_path = "/data/md0/ip2loc/ipv4-bin/IP-COUNTRY-REGION-CITY-LATITUDE-LONGITUDE-ZIPCODE-TIMEZONE-ISP-DOMAIN-NETSPEED-AREACODE-WEATHER-MOBILE-ELEVATION-USAGETYPE.BIN";
loc = new IP2Location(db_path);
gson = new Gson();
} catch (IOException e) {
e.printStackTrace();
}
}
}
数据进入主题
和流
的速度非常快(可能是每秒一百万条记录)。在该方法上使用 synchronized
后,每个 ksql-server
节点中的速度为每秒 3000 条记录/消息。以这个速度,你知道,要追上这个速度需要时间。如果没有同步方法,我会看到损坏的数据,因为单个对象/方法被多个线程使用。
问题1:KSQL 到底如何调用udf
调用?
问题2:我可以使用线程处理udf
中的请求吗?
问题3:由于主题/流有 15 个分区,我应该启动 15 个 ksql-servers
节点吗?
谢谢。
最佳答案
Question1: How exactly the udf call would be called/invoked by KSQL?
不知道你的意思。一旦您的 UDF 可用于 KSQL(请参阅 https://docs.confluent.io/current/ksql/docs/developer-guide/udf.html#deploying ),您就可以在 KSQL 语句中将 UDF 调用为 IP2LOOKUP
。您还可以在 KSQL 中运行 SHOW FUNCTIONS
来确认您的 UDF 可供使用。
也许您问这个是因为您的下一个问题? KSQL 将一次调用您的 UDF 一条消息。
Question2: Could I use threads handling the requests in udf?
你为什么要这么做?您是否担心 KSQL 使用当前的 UDF 代码无法处理传入的数据量?说到这里,您尝试处理的预期数据量是多少,因为您可能正在尝试进行过早优化?
此外,在不了解更多细节的情况下,我认为 UDF 的多线程设置不会产生任何优势,因为 UDF 在调用时仍然一次只能处理一条消息(每个 KSQL 服务器或,更准确地说,每个流任务,其中每个 KSQL 服务器可以有多个任务;我提到这一点是为了清楚地表明,KSQL 中的 UDF 不会因为在所有服务器上只处理一条消息而成为您的处理瓶颈;处理当然是分布式的并且并行发生)。
Question3: Being the Topic/Stream is of 15 partitions, should I spin-up 15 nodes of ksql-servers?
这取决于您的数据量。您可以根据需要运行任意数量的 KSQL 服务器。如果数据量较小,单个 KSQL 服务器可能就足够了。如果数据量较大,您可以开始启动额外的 KSQL 服务器,最多可达 15 个服务器(因为输入主题有 15 个分区)。任何额外的 KSQL 服务器都将处于空闲状态。
如果 15 个 KSQL 服务器不够,您应该将输入主题的分区数量从 15 个增加到更高的数量,然后您还可以启动更多 KSQL 服务器(从而增加计算能力)您的设置)。
关于java - KSQL : Could I use threads in KSQL UDF functions to speed up the process?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55616464/
main.cpp #include "Primes.h" #include int main(){ std::string choose; int num1, num2; w
似乎函数 qwertyInches() 应该可以工作但是当我在 main() 中调用它时它给了我 [Error] called object 'qwertyInches' is not a funct
我无法理解 C++ 语法的工作原理。 #include using namespace std; class Accumulator{ private: int value; public:
在 类中声明 函数成员时,我们可以同时执行这两种操作; Function first; Function() second; 它们之间有什么区别? 最佳答案 Function 代表任意函数: void
“colonna”怎么可能是一个简单的字符串: $('td.' + colonna).css('background-color','#ffddaa'); 可以正确突出显示有趣单元格的背景,并且: $
我正在尝试将网页中的动态参数中继到函数中,然后函数将它们传递给函数内部的调用。比如下面这个简化的代码片段,现在这样,直接传入参数是没有问题的。但是,如何在不为每个可能的 colorbox 参数设置 s
C++ 中是否有一种模式允许您返回一个函数,它返回一个函数本身。例如 std::function func = ...; do { func = func(); } while (func);
我正在将 Windows 程序集移植到 Linux。我有一些代码要移植。我实际上是 linux 中 C 的新手。我知道 C 基础知识是一样的! typedef struct sReader {
我一直在寻找一个很好的解释,所以我很清楚。示例: this.onDeleteHandler(index)}/> 对比 对比 this.nameChangedhandler(event, perso
function(){}.__proto__ === Function.prototype 和 Function.prototype === function(){}.__proto__ 得到不同的结
https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Function 据说 Propert
VBA 中的函数没有特殊类型。我很难理解如何在 Excel VBA 中将函数作为参数添加到函数中。 我想要完成的是这样的事情: function f(g as function, x as strin
所以我正在尝试制作一个包(我没有在下面包含我的 roxygen2 header ): 我有这个功能: date_from_text % dplyr::mutate(!!name := lubr
尝试从 std::function 派生一个类,对于初学者来说,继承构造函数。这是我的猜测: #include #include using namespace std; template cla
我正在尝试编写一个返回另一个函数的函数。我的目标是编写一个函数,它接受一个对象并返回另一个函数“search”。当我使用键调用搜索函数时,我想从第一个函数中给定的对象返回该键的值。 propertyO
我非常清楚函数式编程技术和命令式编程技术之间的区别。但是现在有一种普遍的趋势是谈论“函数式语言”,这确实让我感到困惑。 当然,像 Haskell 这样的一些语言比 C 等其他语言更欢迎函数式编程。但即
关闭。这个问题是opinion-based 。目前不接受答案。 想要改进这个问题吗?更新问题,以便 editing this post 可以用事实和引文来回答它。 . 已关闭 8 年前。 Improv
我在stackoverflow上查过很多类似的问题,比如call.call 1 , call.call 2 ,但我是新人,无法发表任何评论。我希望我能找到关于 JavaScript 解释器如何执行这些
向 Twilio 发送 SMS 时,Twilio 会向指定的 URL 发送多个请求,以通过 Webhook 提供该 SMS 传送的状态。我想让这个回调异步,所以我开发了一个 Cloud Functio
作为 IaC 的一部分,A 功能应用 ,让我们将其命名为 FuncAppX 是使用 Terraform 部署的,它有一个内置函数。 我需要使用 Terraform 在函数应用程序中访问相同函数的 Ur
我是一名优秀的程序员,十分优秀!