Как сделать некоторые команды кафки в сервисе api балерины

Можно ли добиться этого у балерины

  1. Создать новую тему кафки в балерине
  2. К списку доступных тем в Балерина
  3. Подписаться на созданную тему в балерина

2 ответа

Вы можете

  1. Создать новую тему

Если вы отправляете данные, используя Kafka producer, он опубликует данные для этой конкретной темы, и если тема недоступна, он создаст тему и опубликует. Считайте, что вы хотите опубликовать в теме test от производителя. Вы можете создать конечную точку производителя с именем sampleProducer и отправить данные в определенную тему, используя send() функция.

endpoint kafka:SimpleProducer sampleProducer {
  bootstrapServers: "localhost:9090",
  acks: "all",
};

string topic = "test";
string msg = "Your Message";
byte[] messageToPublish = msg.toByteArray("UTF-8");
sampleProducer->send(messageToPublish, topic);`

Теперь, если есть тема под названием test доступен для брокера Kafka, размещенного на localhost:9090, он опубликует сообщение в теме или создаст тему, если ее не существует.

  1. Подписаться на новую тему

Ты можешь использовать Kafka:SimpleConsumer.subscribe() позвоните, чтобы подписаться на тему.

endpoint kafka:SimpleConsumer sampleConsumer {
  bootstrapServers: "localhost:9090",
  groupId: "test-consumers",
  autoCommit: false
};

string topic = "test";
string[] topics = [topic];
sampleConsumer->subscribe(topics);

Обратите внимание, что subscribe() принимает string[] в качестве входного параметра, следовательно, вы должны передать string[] к этому.

Есть и другие функции, такие как subscribeToPattern(), subscribeWithPartitionRebalance() которые также могут быть использованы для подписки потребителя на тему, вы можете найти больше о них в документации API.

Но чтобы перечислить доступные темы, вам нужно получить список тем от самого зоопарка. Но вы можете получить список тем, на которые в данный момент подписывается конкретный потребитель, с помощью балерины.

string[] subscribedTopics;
var result = sampleConsumer->getSubscription();

match result {
  string[] topics => {
    subscribedTopics = topics;
  }
  error e => {
    // Do whatever you need
  }
}

Обязательно обработайте ошибку здесь, так как getSubscription() может вернуть либо string[] или error,

Вы можете подписаться на тему, используя следующий код:

import ballerina/log;
import wso2/kafka;
import ballerina/internal;

// Kafka consumer endpoint
endpoint kafka:SimpleConsumer consumer {
    bootstrapServers: "localhost:9092, localhost:9093",
    // Consumer group ID
    groupId: "test-group",
    // Listen from topic 'test'
    topics: ["test"],
    // Poll every 1 second
    pollingInterval:1000
};

// Kafka service that listens from the topic 'product-price'
// 'inventoryControlService' subscribed to new product price updates from
// the product admin and updates the Database.
service<kafka:Consumer> kafkaService bind consumer {
    // Triggered whenever a message added to the subscribed topic
    onMessage(kafka:ConsumerAction consumerAction, kafka:ConsumerRecord[] records) {
        // Dispatched set of Kafka records to service, We process each one by one.
        foreach entry in records {
            byte[] serializedMsg = entry.value;
            // Convert the serialized message to string message
            string msg = internal:byteArrayToString(serializedMsg, "UTF-8");
            log:printInfo("New message received from the product admin");
            // log the retrieved Kafka record
            log:printInfo("Topic: " + entry.topic + "; Received Message: " + msg);
            // Mock logic
            // Update the database with the new price for the specified product
            log:printInfo("Database updated with the new price of the product");
        }
    }
}

Это Github репо может быть весьма полезным для вас. Он содержит различные примеры для потребителей и производителей.

Что касается ваших вопросов по созданию и перечислению тем, если вам не нужно выполнять эти действия от Ballerina, вы можете сделать это из командной строки:

bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --from-beginning
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor <number_of_replicas> --partitions <number_of_partitions> --topic test
Другие вопросы по тегам