我可以在ksql udf函数中使用线程来加速进程吗?

wswtfjt7  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(499)

我是独立的 ksql-server 在3节点中与 Kafka 3节点集群。创建了一个 StreamTopic 数据流中有15个分区和数据,可以进行一些扩展。有一段代码 UDF 查找ip2location.bin文件和 UDF 类看起来像:

import java.io.IOException;
import java.util.Map;

import com.google.gson.Gson;

import io.confluent.common.Configurable;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;

@UdfDescription(name = "Ip2Lookup", description = "Lookup class for IP2Location database.")
public class Ip2Lookup implements Configurable {

    private IP2Location loc = null;
    private Gson gson = null;

    @Udf(description = "fetches the geoloc of the ipaddress.")
    public synchronized String ip2lookup(String ip) {

        String json = null;
        if (loc != null) {
            IP2LocationResult result = null;
            try {
                result = loc.query(ip);
                System.out.println(result);
                json = gson.toJson(result);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return json;
        }
        return ip;
    }

    @Override
    public void configure(Map<String, ?> arg0) {

        try {
            String db_path = null;
            String os = System.getProperty("os.name").toLowerCase();

            db_path = "/data/md0/ip2loc/ipv4-bin/IP-COUNTRY-REGION-CITY-LATITUDE-LONGITUDE-ZIPCODE-TIMEZONE-ISP-DOMAIN-NETSPEED-AREACODE-WEATHER-MOBILE-ELEVATION-USAGETYPE.BIN";

            loc = new IP2Location(db_path);
            gson = new Gson();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

数据输入到 Topic 然后进入 Stream 相当快(每秒可能有一百万条记录)。与 synchronized 在方法上,速度是每秒3000条记录/消息 ksql-server 节点。以这样的速度,你知道,要赶上这个速度需要时间。没有 synchronized 方法,我看到损坏的数据,因为单个对象/方法被多个线程使用。
问题1:具体情况如何 udf 调用将由ksql调用/调用?
问题2:我可以使用线程处理中的请求吗 udf ?
问题3:作为主题/流是由15个分区组成的,我是否应该旋转15个分区的节点 ksql-servers ?
谢谢。

fnatzsnv

fnatzsnv1#

问题1:ksql究竟如何调用udf调用?
不知道你的意思。一旦您的自定义项可供ksql使用(请参阅https://docs.confluent.io/current/ksql/docs/developer-guide/udf.html#deploying),您可以在ksql语句中调用udf IP2LOOKUP . 你也可以跑步 SHOW FUNCTIONS 在ksql中确认您的udf可供使用。
也许你是因为你的下一个问题而问的?ksql将一次调用一条消息。
问题2:我可以使用线程处理udf中的请求吗?
你为什么要这么做?您是否担心使用当前udf代码的ksql无法处理传入的数据量?说到这里,您试图处理的预期数据量是多少,因为您可能正在尝试进行过早的优化?
另外,在不了解更多细节的情况下,我认为udf的多线程设置不会产生任何优势,因为udf在被调用时,仍然一次只能处理一条消息(每个ksql服务器,或者更准确地说,每个流任务,每个ksql服务器可以有多条消息;我提到这一点是为了清楚地表明,ksql中的udf并不是通过在所有服务器上只处理一条消息来限制您的处理;处理过程当然是分布式的,并且是并行的)。
问题3:作为主题/流是由15个分区组成的,我应该启动ksql服务器的15个节点吗?
这取决于您的数据量。您可以根据需要旋转任意多或任意少的ksql服务器。如果数据量很低,一个ksql服务器就足够了。如果数据量更大,您可以启动额外的ksql服务器,最多15个服务器(因为输入主题有15个分区)。任何额外的ksql服务器都将处于空闲状态。
在15个ksql服务器不够的情况下,应该将输入主题的分区数从15增加到更大的数目,然后还可以启动更多的ksql服务器(从而增加设置的计算容量)。

相关问题