我如何创建Kafka主题运行在不同的集群从另一个Spark集群?

yacmzcpb  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(308)

我有两个集群分别运行Kafka和Spark。我想从spark cluster创建一个Kafka主题。我注意到要创建一个主题,我们需要调用kafka-topics.sh,这在spark cluster中是不可用的。命令应该通过shell调用。
例如:/kafka\u topics.sh--zookeeper:2181--create--topic test\u topic
这个脚本应该从spark集群调用,并且应该在kafka集群上执行。有人能帮我吗?

xxhby3vn

xxhby3vn1#

您可以使用javaapi和maven依赖项(kafka和zookeeper)来创建kafka主题,如下所示。您可以从提交spark应用程序的代码中调用代码。

<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.3</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.1</version>
</dependency>

import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import kafka.utils.ZKStringSerializer$;
import kafka.admin.AdminUtils;

public final class KafkaUtils {
    public static void main(String[] args) throws Exception {       
        KafkaUtils.createTopic("x.x.x.x:2181,y.y.y.y:2181", "topicName", 1, 0, new Properties());       
    }

    public static void createTopic(String zkHosts, String topicName, int numberOfPartition, int replicationFactor, Properties properties) {
        ZkClient zkClient = null;
        try {
            zkClient = getZkClient(zkHosts);
            AdminUtils.createTopic(zkClient, topicName, numberOfPartition, replicationFactor, properties);
        } catch (Exception exception) {
            exception.printStackTrace();
        } finally {
            if (zkClient != null) {
                try {
                    zkClient.close();
                } catch (ZkInterruptedException ex) {
                    ex.printStackTrace();
                }

            }
        }
    }

    private static ZkClient getZkClient(String zkHosts) {
        ZkClient zkClient = null;
        // Zookeeper sessionTimeoutMs
        final int sessionTimeoutMs = 10000;
        // Zookeeper connectionTimeoutMs
        final int connectionTimeoutMs = 10000;
        zkClient = new ZkClient(zkHosts, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$);
        return zkClient;
    }
}

这里x.x.x.x和y.y.y.y是kafka的zk集群主机。希望这有帮助。

相关问题