Потребитель Кафки не подписывается.NET
Я пишу консольное приложение, которое потребляет данные Kafka. Я проверил конечные точки и работу, когда запускаю их в Почтальоне. Моя проблема в моем приложении: метод подписки явно не работает (я не получаю ошибок или ответов от API о каких-либо ошибках), потому что я постоянно получаю веб-ошибку: {"error_code":50002,"message":"Ошибка Kafka: Потребитель не подписан на какие-либо темы или не имеет разделов "} при попытке опроса записей:
Вот мой код:
Создать потребителя (это работает)
public void CreateConsumer(string name,string url)
{
_logger.Debug($"Creating Consumer...");
try
{
var request = (HttpWebRequest) WebRequest.Create(url);
_logger.Debug($"Consumer Url: {url}");
var body = new RequestEntity
{
Name = $"{name}_consumer",
Format = "json",
AutoOffsetReset = "earliest"
};
var requestJson = JsonConvert.SerializeObject(body);
_logger.Debug($"Consumer Body: {requestJson}");
request.Method = WebRequestMethods.Http.Post;
request.ContentType = "application/vnd.kafka.v2+json";
var requestStream = request.GetRequestStream();
requestStream.WriteTimeout = 300000;
using (var streamWriter = new StreamWriter(requestStream))
{
streamWriter.Write(requestJson);
}
using (var response = (HttpWebResponse) request.GetResponse())
using (var reader = new StreamReader(response.GetResponseStream()))
{
var json = reader.ReadToEnd();
_logger.Error($"Consumer Response: {json}");
}
}
catch (Exception exception)
{
_logger.Error($"{exception.Message}", exception);
}
}
Подписаться (это, очевидно, не работает)
public void Subscribe(string url,string topic)
{
_logger.Debug($"Starting Subscription");
try
{
var subUrl = url + $"/instances/{topic}_consumer/subscription";
var request = (HttpWebRequest)WebRequest.Create(subUrl);
_logger.Debug($"Subscriber Url: {subUrl}");
var topicList = new List<string> { topic };
var body = new SubscribeEntity
{
Topics = topicList
};
var requestJson = JsonConvert.SerializeObject(body);
_logger.Debug($"Subscription Body: {requestJson}");
request.Headers.Clear();
request.Method = WebRequestMethods.Http.Post;
request.ContentType = "application/vnd.kafka.v2+json";
var streamWriter = new StreamWriter(request.GetRequestStream());
try
{
streamWriter.Write(requestJson);
}
catch (Exception exception)
{
_logger.Error($"{exception.Message}", exception);
_logger.Error($"{exception.InnerException}", exception);
}
finally {
streamWriter.Dispose();
}
_logger.Debug($"Did the request have a response?: {request.HaveResponse}");
}
catch (Exception exception)
{
_logger.Error($"{exception.Message}", exception);
}
}
Запрос записей (работает, но возвращает ошибку "не подписан"):
public List<JsonEntity> RequestRecords(string url,string topic)
{
try
{
var subUrl = url + $"/instances/{topic}_consumer/records";
_logger.Debug($"Polling records");
List<JsonEntity> jsonEntities;
var request = (HttpWebRequest) WebRequest.Create(subUrl);
_logger.Debug($"Records Url: {subUrl}");
request.Method = WebRequestMethods.Http.Get;
request.Accept = "application/vnd.kafka.json.v2+json";
using (var response = (HttpWebResponse) request.GetResponse())
using (var reader = new StreamReader(response.GetResponseStream()))
{
_logger.Debug($"Response Headers: {response.Headers}");
var json = reader.ReadToEnd();
jsonEntities = JsonConvert.DeserializeObject<List<JsonEntity>>(json);
}
return jsonEntities;
}
catch(WebException exception)
{
if (exception.Response != null)
{
using (var response = (HttpWebResponse)exception.Response)
using(var reader = new StreamReader(response.GetResponseStream()))
{
var error = reader.ReadToEnd();
_logger.Debug($"Web Error: {error}");
}
}
_logger.Error($"{exception.Message}", exception);
return null;
}
}
Вот журналы:
Создание Consumer... URL- адрес потребителя: http://localhost/consumers/care_consumer
Consumer Body: {"name":"care_chat_metrics_consumer","format":"json","auto.offset.reset":"earliest"} Consumer Response: {"instance_id":"care_chat_metrics_consumer","base_uri":"http://localhost/consumers/care_consumer/instances/care_chat_metrics_consumer"} Starting Subscription Subscriber Url: http://localhost/consumers/care_consumer/instances/care_chat_metrics_consumer/subscription Subscription Body: {"topics":["care_chat_metrics"]} Did the request have a response?: False Polling records Records Url: http://localhost/consumers/care_consumer/instances/care_chat_metrics_consumer/records Web Error: {"error_code":50002,"message":"Kafka error: Consumer is not subscribed to any topics or assigned any partitions"} The remote server returned an error: (500) Internal Server Error.