Производитель librdkafka, чтобы взять сообщение из функции и создать его по теме
Поскольку я относительно новичок в Kafka, у меня возникли проблемы с реализацией Kafka Producer, где он получает информацию message
от функции и далее производить ее по определенной теме. Но сложность в том, что на публикацию этой темы уходит много времени. И если я пытаюсь изменить его конфигурацию, он либо пропускает несколько сообщений, либо замедляется скорость создания сообщений по теме. Я опубликую свой код продюсера ниже, и был бы чрезвычайно признателен, если бы кто-нибудь мог помочь мне пройти через это, поскольку я понятия не имею, где я ошибаюсь.
КОД:-
int rdkafka_produce (json_object *message) {
rd_kafka_t *rk; /* Producer instance handle */
rd_kafka_topic_t *rkt; /* Topic object */
rd_kafka_conf_t *conf; /* Temporary configuration object */
char errstr[512]; /* librdkafka API error reporting buffer */
char buf[2048]; /* Message value temporary buffer */
const char *brokers; /* Argument: broker list */
const char *topic; /* Argument: topic to produce to */
int sendcnt=0;
int partition = RD_KAFKA_PARTITION_UA;
rd_kafka_topic_conf_t *topic_conf;
rd_kafka_resp_err_t err;
rd_kafka_headers_t *hdrs = NULL;
/*
* Argument validation
*/
brokers = "localhost:9092";
topic = "tt_stream";
/*
* Create Kafka client configuration place-holder
*/
conf = rd_kafka_conf_new();
/* Set bootstrap broker(s) as a comma-separated list of
* host or host:port (default port 9092).
* librdkafka will use the bootstrap brokers to acquire the full
* set of brokers from the cluster. */
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
return 1;
}
/* Set the delivery report callback.
* This callback will be called once per message to inform
* the application if delivery succeeded or failed.
* See dr_msg_cb() above. */
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
errstr, sizeof(errstr)))) {
fprintf(stderr,
"%% Failed to create new producer: %s\n",
errstr);
exit(1);
}
/* Add brokers */
if (rd_kafka_brokers_add(rk, brokers) == 0) {
fprintf(stderr, "%% No valid brokers specified\n");
/* Set the delivery report callback.
* This callback will be called once per message to inform
* the application if delivery succeeded or failed.
* See dr_msg_cb() above. */
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
errstr, sizeof(errstr)))) {
fprintf(stderr,
"%% Failed to create new producer: %s\n",
errstr);
exit(1);
}
/* Add brokers */
if (rd_kafka_brokers_add(rk, brokers) == 0) {
fprintf(stderr, "%% No valid brokers specified\n");
exit(1);
}
/* Create topic */
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
topic_conf = NULL; /* Now owned by topic */
//while (run && strcpy(buf, json_object_to_json_string(message))) {
strcpy(buf, json_object_to_json_string(message));
size_t len = strlen(buf);
if (buf[len-1] == '\n')
buf[--len] = '\0';
err = RD_KAFKA_RESP_ERR_NO_ERROR;
/* Send/Produce message. */
if (hdrs) {
rd_kafka_headers_t *hdrs_copy;
hdrs_copy = rd_kafka_headers_copy(hdrs);
err = rd_kafka_producev(
rk,
RD_KAFKA_V_RKT(rkt),
RD_KAFKA_V_PARTITION(partition),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_VALUE(buf, len),
RD_KAFKA_V_HEADERS(hdrs_copy),
RD_KAFKA_V_END);
if (err)
rd_kafka_headers_destroy(hdrs_copy);
} else {
if (rd_kafka_produce(
rkt, partition,
RD_KAFKA_MSG_F_COPY,
/* Payload and length */
buf, len,
/* Optional key and its length */
NULL, 0,
/* Message opaque, provided in
* delivery report callback as
* msg_opaque. */
NULL) == -1) {
err = rd_kafka_last_error();
}
}
if (err) {
fprintf(stderr,
"%% Failed to produce to topic %s "
"partition %i: %s\n",
rd_kafka_topic_name(rkt), partition,
rd_kafka_err2str(err));
/* Poll to handle delivery reports */
rd_kafka_poll(rk, 0);
}
sendcnt++;
/* Poll to handle delivery reports */
rd_kafka_poll(rk, 0);
//}
/* Poll to handle delivery reports */
rd_kafka_poll(rk, 0);
/* Wait for messages to be delivered */
while (run && rd_kafka_outq_len(rk) > 0)
rd_kafka_poll(rk, 100); //This is where most of the time is being spent.
/* Destroy topic */
rd_kafka_topic_destroy(rkt);
/* Destroy the handle */
rd_kafka_destroy(rk);
return 0;
}
1 ответ
Вы создаете новый экземпляр клиента-производителя для каждого создаваемого сообщения; это очень дорого, так как ему нужно раскрутить потоки, подключиться к брокерам начальной загрузки, выполнить аутентификацию, выполнить поиск метаданных, подключиться к соответствующим брокерам и т. д., прежде чем сможет создать одно сообщение.
Вместо этого создайте один экземпляр долгоживущего производителя, который вы будете повторно использовать для каждого сообщения, это сократит вашу задержку до нескольких миллисекунд (в зависимости от подключения брокера и нагрузки).