import ballerina/log;
import wso2/kafka;
import ballerina/internal;
// Kafka consumer endpoint
endpoint kafka:SimpleConsumer consumer {
bootstrapServers: "localhost:9092, localhost:9093",
// Consumer group ID
groupId: "test-group",
// Listen from topic 'test'
topics: ["test"],
// Poll every 1 second
pollingInterval:1000
};
// Kafka service that listens from the topic 'product-price'
// 'inventoryControlService' subscribed to new product price updates from
// the product admin and updates the Database.
service<kafka:Consumer> kafkaService bind consumer {
// Triggered whenever a message added to the subscribed topic
onMessage(kafka:ConsumerAction consumerAction, kafka:ConsumerRecord[] records) {
// Dispatched set of Kafka records to service, We process each one by one.
foreach entry in records {
byte[] serializedMsg = entry.value;
// Convert the serialized message to string message
string msg = internal:byteArrayToString(serializedMsg, "UTF-8");
log:printInfo("New message received from the product admin");
// log the retrieved Kafka record
log:printInfo("Topic: " + entry.topic + "; Received Message: " + msg);
// Mock logic
// Update the database with the new price for the specified product
log:printInfo("Database updated with the new price of the product");
}
}
}
string[] subscribedTopics;
var result = sampleConsumer->getSubscription();
if (result is error) {
// Your logic for handling the error
} else {
subscribedTopics = result;
}
2条答案
按热度按时间jk9hmnmh1#
您可以使用以下代码订阅主题:
这个github repo可能对您非常有用。它为消费者和生产者提供了各种各样的例子。
关于创建和列出主题的问题,如果不需要从ballerina执行这些操作,可以从命令行执行这些操作:
jm81lzqq2#
编辑:更新示例代码以符合最新的ballerina版本(从v1.2.0以上)。
你可以
创建新主题
如果您使用
Kafka producer
,它将数据发布到该特定主题,如果该主题不可用,它将创建该主题并发布(为了支持这一点,你必须设置auto.create.topics.enable=true
在代理属性中)。假设您要发布到主题
test
从制作人那里。您可以创建一个名为kafka:Producer
并使用send()
功能。如果有一个主题叫做
test
可供Kafka经纪人使用localhost:9092
,它将消息发布到主题,或者如果主题不存在,它将创建主题。订阅新主题
你可以用
subscribe()
的功能Kafka:Consumer
订阅主题。请注意
subscribe()
拿string[]
作为输入参数,因此应该传递string[]
去吧。还有其他功能,如
subscribeToPattern()
,subscribeWithPartitionRebalance()
它们也可用于向使用者订阅主题,您可以在api文档中找到有关它们的更多信息。但是要列出可用的主题,您需要从kafka代理本身获取主题列表。但是你可以使用ballerina获得一个主题列表,该列表目前由特定的消费者订阅。
确保在这里处理错误,因为
getSubscription()
可以返回string[]
或者一个error
. 芭蕾舞女式的卫兵可以帮你表演这个把戏。