Проблемы создания и использования сообщений из / в kafka AWS MSK с бессерверным приложением dot net core
Я пытаюсь создать API производителя, который должен просто захватить тело запроса и отправить его в тему kafka в кластере MSK. (проблема почти такая же для потребительской части)
Я создал кластер MSK в VPC с 3 выделенными подсетями. Бессерверное приложение работает в том же VPC, но в другой подсети, добавляя VpcConfig в файл serverless.template. Я также прикрепляю к этому файлу политики "AWSLambdaFullAccess" и "AmazonMSKFullAccess". Группы безопасности с обеих сторон разрешают весь трафик во всех портах.
Я создаю экземпляр EC2 в той же подсети, и я могу успешно подключиться с помощью команды bin/kafka-console-producer.sh и создавать сообщения.
В моем ядре dotnet используется пакет "Confluent.Kafka" версии 1.3.0.
Код выглядит так:
var config = new ProducerConfig { BootstrapServers = this.BootstrapServers };
using (var p = new ProducerBuilder<Null, string>(config).Build())
{
try
{
this.Logger.LogInformation($"Producer name: {p.Name}");
var dr = await p.ProduceAsync(this.TopicName, new Message<Null, string> { Value = message });
this.Logger.LogInformation($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
}
catch (ProduceException<Null, string> e)
{
var msg = $"Delivery failed: {e.Error.Reason}";
Console.WriteLine(msg);
this.Logger.LogCritical(msg);
throw new Exception(msg);
}
}
Я пытаюсь настроить серверы начальной загрузки с DNS, предоставленным непосредственно из MSK, а также добавляю префикс "plaintext: //"
Есть идеи, почему я получаю TimeOut в строке "ProduceAsync"?
Я предполагаю, что это связано с тем, что библиотека Confluence.Kafka каким-то образом несовместима с MSK, но я действительно не могу понять, правда это или нет.
Заранее благодарю.
С уважением,