我是独立的 ksql-server
在3节点中与 Kafka
3节点集群。创建了一个 Stream
从 Topic
数据流中有15个分区和数据,可以进行一些扩展。有一段代码 UDF
查找ip2location.bin文件和 UDF
类看起来像:
import java.io.IOException;
import java.util.Map;
import com.google.gson.Gson;
import io.confluent.common.Configurable;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
@UdfDescription(name = "Ip2Lookup", description = "Lookup class for IP2Location database.")
public class Ip2Lookup implements Configurable {
private IP2Location loc = null;
private Gson gson = null;
@Udf(description = "fetches the geoloc of the ipaddress.")
public synchronized String ip2lookup(String ip) {
String json = null;
if (loc != null) {
IP2LocationResult result = null;
try {
result = loc.query(ip);
System.out.println(result);
json = gson.toJson(result);
} catch (IOException e) {
e.printStackTrace();
}
return json;
}
return ip;
}
@Override
public void configure(Map<String, ?> arg0) {
try {
String db_path = null;
String os = System.getProperty("os.name").toLowerCase();
db_path = "/data/md0/ip2loc/ipv4-bin/IP-COUNTRY-REGION-CITY-LATITUDE-LONGITUDE-ZIPCODE-TIMEZONE-ISP-DOMAIN-NETSPEED-AREACODE-WEATHER-MOBILE-ELEVATION-USAGETYPE.BIN";
loc = new IP2Location(db_path);
gson = new Gson();
} catch (IOException e) {
e.printStackTrace();
}
}
}
数据输入到 Topic
然后进入 Stream
相当快(每秒可能有一百万条记录)。与 synchronized
在方法上,速度是每秒3000条记录/消息 ksql-server
节点。以这样的速度,你知道,要赶上这个速度需要时间。没有 synchronized
方法,我看到损坏的数据,因为单个对象/方法被多个线程使用。
问题1:具体情况如何 udf
调用将由ksql调用/调用?
问题2:我可以使用线程处理中的请求吗 udf
?
问题3:作为主题/流是由15个分区组成的,我是否应该旋转15个分区的节点 ksql-servers
?
谢谢。
1条答案
按热度按时间fnatzsnv1#
问题1:ksql究竟如何调用udf调用?
不知道你的意思。一旦您的自定义项可供ksql使用(请参阅https://docs.confluent.io/current/ksql/docs/developer-guide/udf.html#deploying),您可以在ksql语句中调用udf
IP2LOOKUP
. 你也可以跑步SHOW FUNCTIONS
在ksql中确认您的udf可供使用。也许你是因为你的下一个问题而问的?ksql将一次调用一条消息。
问题2:我可以使用线程处理udf中的请求吗?
你为什么要这么做?您是否担心使用当前udf代码的ksql无法处理传入的数据量?说到这里,您试图处理的预期数据量是多少,因为您可能正在尝试进行过早的优化?
另外,在不了解更多细节的情况下,我认为udf的多线程设置不会产生任何优势,因为udf在被调用时,仍然一次只能处理一条消息(每个ksql服务器,或者更准确地说,每个流任务,每个ksql服务器可以有多条消息;我提到这一点是为了清楚地表明,ksql中的udf并不是通过在所有服务器上只处理一条消息来限制您的处理;处理过程当然是分布式的,并且是并行的)。
问题3:作为主题/流是由15个分区组成的,我应该启动ksql服务器的15个节点吗?
这取决于您的数据量。您可以根据需要旋转任意多或任意少的ksql服务器。如果数据量很低,一个ksql服务器就足够了。如果数据量更大,您可以启动额外的ksql服务器,最多15个服务器(因为输入主题有15个分区)。任何额外的ksql服务器都将处于空闲状态。
在15个ksql服务器不够的情况下,应该将输入主题的分区数从15增加到更大的数目,然后还可以启动更多的ksql服务器(从而增加设置的计算容量)。