Как сделать некоторые команды кафки в сервисе api балерины
Можно ли добиться этого у балерины
- Создать новую тему кафки в балерине
- К списку доступных тем в Балерина
- Подписаться на созданную тему в балерина
2 ответа
Вы можете
- Создать новую тему
Если вы отправляете данные, используя
Kafka producer
, он опубликует данные для этой конкретной темы, и если тема недоступна, он создаст тему и опубликует. Считайте, что вы хотите опубликовать в темеtest
от производителя. Вы можете создать конечную точку производителя с именемsampleProducer
и отправить данные в определенную тему, используяsend()
функция.
endpoint kafka:SimpleProducer sampleProducer {
bootstrapServers: "localhost:9090",
acks: "all",
};
string topic = "test";
string msg = "Your Message";
byte[] messageToPublish = msg.toByteArray("UTF-8");
sampleProducer->send(messageToPublish, topic);`
Теперь, если есть тема под названием
test
доступен для брокера Kafka, размещенного наlocalhost:9090
, он опубликует сообщение в теме или создаст тему, если ее не существует.
- Подписаться на новую тему
Ты можешь использовать
Kafka:SimpleConsumer.subscribe()
позвоните, чтобы подписаться на тему.
endpoint kafka:SimpleConsumer sampleConsumer {
bootstrapServers: "localhost:9090",
groupId: "test-consumers",
autoCommit: false
};
string topic = "test";
string[] topics = [topic];
sampleConsumer->subscribe(topics);
Обратите внимание, что
subscribe()
принимаетstring[]
в качестве входного параметра, следовательно, вы должны передатьstring[]
к этому.Есть и другие функции, такие как
subscribeToPattern()
,subscribeWithPartitionRebalance()
которые также могут быть использованы для подписки потребителя на тему, вы можете найти больше о них в документации API.
Но чтобы перечислить доступные темы, вам нужно получить список тем от самого зоопарка. Но вы можете получить список тем, на которые в данный момент подписывается конкретный потребитель, с помощью балерины.
string[] subscribedTopics;
var result = sampleConsumer->getSubscription();
match result {
string[] topics => {
subscribedTopics = topics;
}
error e => {
// Do whatever you need
}
}
Обязательно обработайте ошибку здесь, так как
getSubscription()
может вернуть либоstring[]
илиerror
,
Вы можете подписаться на тему, используя следующий код:
import ballerina/log;
import wso2/kafka;
import ballerina/internal;
// Kafka consumer endpoint
endpoint kafka:SimpleConsumer consumer {
bootstrapServers: "localhost:9092, localhost:9093",
// Consumer group ID
groupId: "test-group",
// Listen from topic 'test'
topics: ["test"],
// Poll every 1 second
pollingInterval:1000
};
// Kafka service that listens from the topic 'product-price'
// 'inventoryControlService' subscribed to new product price updates from
// the product admin and updates the Database.
service<kafka:Consumer> kafkaService bind consumer {
// Triggered whenever a message added to the subscribed topic
onMessage(kafka:ConsumerAction consumerAction, kafka:ConsumerRecord[] records) {
// Dispatched set of Kafka records to service, We process each one by one.
foreach entry in records {
byte[] serializedMsg = entry.value;
// Convert the serialized message to string message
string msg = internal:byteArrayToString(serializedMsg, "UTF-8");
log:printInfo("New message received from the product admin");
// log the retrieved Kafka record
log:printInfo("Topic: " + entry.topic + "; Received Message: " + msg);
// Mock logic
// Update the database with the new price for the specified product
log:printInfo("Database updated with the new price of the product");
}
}
}
Это Github репо может быть весьма полезным для вас. Он содержит различные примеры для потребителей и производителей.
Что касается ваших вопросов по созданию и перечислению тем, если вам не нужно выполнять эти действия от Ballerina, вы можете сделать это из командной строки:
bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --from-beginning
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor <number_of_replicas> --partitions <number_of_partitions> --topic test