如何在ballerina服务api中执行kafka命令

aor9mmx1  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(405)

在芭蕾舞中有可能做到这一点吗
在芭蕾舞中创造一个新的Kafka主题
列出芭蕾舞中可用的主题
订阅芭蕾舞剧中创建的主题

jk9hmnmh

jk9hmnmh1#

您可以使用以下代码订阅主题:

import ballerina/log;
import wso2/kafka;
import ballerina/internal;

// Kafka consumer endpoint
endpoint kafka:SimpleConsumer consumer {
    bootstrapServers: "localhost:9092, localhost:9093",
    // Consumer group ID
    groupId: "test-group",
    // Listen from topic 'test'
    topics: ["test"],
    // Poll every 1 second
    pollingInterval:1000
};

// Kafka service that listens from the topic 'product-price'
// 'inventoryControlService' subscribed to new product price updates from
// the product admin and updates the Database.
service<kafka:Consumer> kafkaService bind consumer {
    // Triggered whenever a message added to the subscribed topic
    onMessage(kafka:ConsumerAction consumerAction, kafka:ConsumerRecord[] records) {
        // Dispatched set of Kafka records to service, We process each one by one.
        foreach entry in records {
            byte[] serializedMsg = entry.value;
            // Convert the serialized message to string message
            string msg = internal:byteArrayToString(serializedMsg, "UTF-8");
            log:printInfo("New message received from the product admin");
            // log the retrieved Kafka record
            log:printInfo("Topic: " + entry.topic + "; Received Message: " + msg);
            // Mock logic
            // Update the database with the new price for the specified product
            log:printInfo("Database updated with the new price of the product");
        }
    }
}

这个github repo可能对您非常有用。它为消费者和生产者提供了各种各样的例子。
关于创建和列出主题的问题,如果不需要从ballerina执行这些操作,可以从命令行执行这些操作:

bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --from-beginning
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor <number_of_replicas> --partitions <number_of_partitions> --topic test
jm81lzqq

jm81lzqq2#

编辑:更新示例代码以符合最新的ballerina版本(从v1.2.0以上)。
你可以
创建新主题
如果您使用 Kafka producer ,它将数据发布到该特定主题,如果该主题不可用,它将创建该主题并发布(为了支持这一点,你必须设置 auto.create.topics.enable=true 在代理属性中)。
假设您要发布到主题 test 从制作人那里。您可以创建一个名为 kafka:Producer 并使用 send() 功能。

kafka:Producer sampleProducer = new ({
  bootstrapServers: "localhost:9092",
  acks: "all",
  valueSerializerType: kafka:SER_STRING
});

string topic = "test";
string msg = "Your Message";
sampleProducer->send(messageToPublish, topic);`

如果有一个主题叫做 test 可供Kafka经纪人使用 localhost:9092 ,它将消息发布到主题,或者如果主题不存在,它将创建主题。
订阅新主题
你可以用 subscribe() 的功能 Kafka:Consumer 订阅主题。

listener kafka:Consumer sampleConsumer = new ({
  bootstrapServers: "localhost:9090",
  groupId: "test-consumers",
  valueDeserializerType: kafka:DES_STRING
});

string topic = "test";
string[] topics = [topic];
sampleConsumer->subscribe(topics);

请注意 subscribe()string[] 作为输入参数,因此应该传递 string[] 去吧。
还有其他功能,如 subscribeToPattern() , subscribeWithPartitionRebalance() 它们也可用于向使用者订阅主题,您可以在api文档中找到有关它们的更多信息。
但是要列出可用的主题,您需要从kafka代理本身获取主题列表。但是你可以使用ballerina获得一个主题列表,该列表目前由特定的消费者订阅。

string[] subscribedTopics;
var result = sampleConsumer->getSubscription();
if (result is error) {
  // Your logic for handling the error
} else {
    subscribedTopics = result;
}

确保在这里处理错误,因为 getSubscription() 可以返回 string[] 或者一个 error . 芭蕾舞女式的卫兵可以帮你表演这个把戏。

相关问题