Потребитель Кафки не подписывается.NET

Я пишу консольное приложение, которое потребляет данные Kafka. Я проверил конечные точки и работу, когда запускаю их в Почтальоне. Моя проблема в моем приложении: метод подписки явно не работает (я не получаю ошибок или ответов от API о каких-либо ошибках), потому что я постоянно получаю веб-ошибку: {"error_code":50002,"message":"Ошибка Kafka: Потребитель не подписан на какие-либо темы или не имеет разделов "} при попытке опроса записей:

Вот мой код:

Создать потребителя (это работает)

public void CreateConsumer(string name,string url)
{
    _logger.Debug($"Creating Consumer...");
    try
    {
        var request = (HttpWebRequest) WebRequest.Create(url);
        _logger.Debug($"Consumer Url: {url}");
        var body = new RequestEntity
        {
            Name = $"{name}_consumer",
            Format = "json",
            AutoOffsetReset = "earliest"
        };
        var requestJson = JsonConvert.SerializeObject(body);

        _logger.Debug($"Consumer Body: {requestJson}");


        request.Method = WebRequestMethods.Http.Post;
        request.ContentType = "application/vnd.kafka.v2+json";
        var requestStream = request.GetRequestStream();
        requestStream.WriteTimeout = 300000;
        using (var streamWriter = new StreamWriter(requestStream))
        {
            streamWriter.Write(requestJson);


        }
        using (var response = (HttpWebResponse) request.GetResponse())
        using (var reader = new StreamReader(response.GetResponseStream()))
        {
            var json = reader.ReadToEnd();
            _logger.Error($"Consumer Response: {json}");
        }


    }
    catch (Exception exception)
    {
        _logger.Error($"{exception.Message}", exception);
    }
}

Подписаться (это, очевидно, не работает)

public void Subscribe(string url,string topic)
{
    _logger.Debug($"Starting Subscription");
    try
    {
        var subUrl = url + $"/instances/{topic}_consumer/subscription";
        var request = (HttpWebRequest)WebRequest.Create(subUrl);
        _logger.Debug($"Subscriber Url: {subUrl}");
        var topicList = new List<string> { topic };
        var body = new SubscribeEntity
        {
            Topics = topicList
        };
        var requestJson = JsonConvert.SerializeObject(body);
        _logger.Debug($"Subscription Body: {requestJson}");
        request.Headers.Clear();
        request.Method = WebRequestMethods.Http.Post;
        request.ContentType = "application/vnd.kafka.v2+json";

        var streamWriter = new StreamWriter(request.GetRequestStream());

            try
            {

                streamWriter.Write(requestJson);
            }
            catch (Exception exception)
            {
                _logger.Error($"{exception.Message}", exception);
                _logger.Error($"{exception.InnerException}", exception);
            }
            finally { 
            streamWriter.Dispose();
            }
        _logger.Debug($"Did the request have a response?: {request.HaveResponse}");


    }
    catch (Exception exception)
    {
        _logger.Error($"{exception.Message}", exception);
    }
}

Запрос записей (работает, но возвращает ошибку "не подписан"):

 public List<JsonEntity> RequestRecords(string url,string topic)
{
    try
    {
        var subUrl = url + $"/instances/{topic}_consumer/records";
        _logger.Debug($"Polling records");
        List<JsonEntity> jsonEntities;
        var request = (HttpWebRequest) WebRequest.Create(subUrl);
        _logger.Debug($"Records Url: {subUrl}");
        request.Method = WebRequestMethods.Http.Get;
        request.Accept = "application/vnd.kafka.json.v2+json";
        using (var response = (HttpWebResponse) request.GetResponse())
        using (var reader = new StreamReader(response.GetResponseStream()))
        {
            _logger.Debug($"Response Headers: {response.Headers}");
            var json = reader.ReadToEnd();
            jsonEntities = JsonConvert.DeserializeObject<List<JsonEntity>>(json);
        }
        return jsonEntities;
    }
    catch(WebException exception)
    {
        if (exception.Response != null)
        {
            using (var response = (HttpWebResponse)exception.Response)
            using(var reader = new StreamReader(response.GetResponseStream()))
            {
                var error = reader.ReadToEnd();
                _logger.Debug($"Web Error: {error}");
            }

        }
        _logger.Error($"{exception.Message}", exception);
        return null;
    }
}

Вот журналы:

Создание Consumer... URL- адрес потребителя: http://localhost/consumers/care_consumer

Consumer Body: {"name":"care_chat_metrics_consumer","format":"json","auto.offset.reset":"earliest"}


  Consumer Response:
  {"instance_id":"care_chat_metrics_consumer","base_uri":"http://localhost/consumers/care_consumer/instances/care_chat_metrics_consumer"}

  Starting Subscription

  Subscriber Url:
  http://localhost/consumers/care_consumer/instances/care_chat_metrics_consumer/subscription

  Subscription Body: {"topics":["care_chat_metrics"]}

  Did the request have a response?: False

  Polling records

  Records Url:
  http://localhost/consumers/care_consumer/instances/care_chat_metrics_consumer/records

  Web Error: {"error_code":50002,"message":"Kafka error: Consumer is
  not subscribed to any topics or assigned any partitions"}

  The remote server returned an error: (500) Internal Server Error.

0 ответов

Другие вопросы по тегам