JSONParseException - чтение данных с помощью API отдыха Kafka

ТЕМА КАФКА (test3)

$ kafka-console-consumer --bootstrap-server broker:9092 --topic test3 --from-beginning

"Can we write to a topic that does not exist?"
"Can we write to a topic that does not exist?"
{"foo":"bar"}
["foo","bar"]
confluent
confluent
confluent
kafka
logs


0

0

Потребительский (кафка-отдых API на localhost:8082 )

  1. Создать потребителя POST запросить http://localhost:8082/consumers/rested

Тело запроса:

 {
   "format": "json",
   "auto.offset.reset": "earliest",
   "auto.commit.enable": "false"
 }

Тело ответа:

{
   "instance_id": "rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff",

   "base_uri": "http://rest-proxy:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff"

}
  1. Создать подписку usihg POST в http://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/subscription

используя заголовки:

Host: http://localhost:8082
Content-Type: application/vnd.kafka.v2+json

и тело запроса:

{
    "topics": [
      "test3"
    ]
}

возвращает ответ 204 No Content,

  1. Читайте записи, сделав GET запросить http://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/records

используя заголовки:

Host: http://localhost:8082
Accept: application/vnd.kafka.json.v2+json

возвращает ответ:

{
    "error_code": 50002,
    "message": "Kafka error: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')\n at [Source: (byte[])\"key\"; line: 1, column: 7]"
}

Как мы можем решить эту проблему и обеспечить получение данных?

Исключение (на Кафку)

Журнал запущенного прокси-сервера Kafka Rest имеет следующее исключение:

rest-proxy         | [2018-12-31 03:09:27,232] INFO 172.25.0.1 - - [31/Dec/2018:03:09:26 +0000] "GET /consumers/rest-consumer/instances/rest-consumer-8e49873e-13ce-46a5-be1f-0237a0369efe/records HTTP/1.1" 500 211  341 (io.confluent.rest-utils.requests)
rest-proxy         | [2018-12-31 03:09:27,235] ERROR Unexpected exception in consumer read task id=io.confluent.kafkarest.v2.KafkaConsumerReadTask@59611e28  (io.confluent.kafkarest.v2.KafkaConsumerReadTask)
rest-proxy         | org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy         |  at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy         | Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy         |  at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy         |    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
rest-proxy         |    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679)
rest-proxy         |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526)
rest-proxy         |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621)
rest-proxy         |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826)
rest-proxy         |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723)
rest-proxy         |    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141)
rest-proxy         |    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000)
rest-proxy         |    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091)
rest-proxy         |    at io.confluent.kafkarest.v2.JsonKafkaConsumerState.deserialize(JsonKafkaConsumerState.java:79)
rest-proxy         |    at io.confluent.kafkarest.v2.JsonKafkaConsumerState.createConsumerRecord(JsonKafkaConsumerState.java:64)
rest-proxy         |    at io.confluent.kafkarest.v2.KafkaConsumerReadTask.maybeAddRecord(KafkaConsumerReadTask.java:158)
rest-proxy         |    at io.confluent.kafkarest.v2.KafkaConsumerReadTask.addRecords(KafkaConsumerReadTask.java:142)
rest-proxy         |    at io.confluent.kafkarest.v2.KafkaConsumerReadTask.doPartialRead(KafkaConsumerReadTask.java:99)
rest-proxy         |    at io.confluent.kafkarest.v2.KafkaConsumerManager$RunnableReadTask.run(KafkaConsumerManager.java:370)
rest-proxy         |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
rest-proxy         |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
rest-proxy         |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
rest-proxy         |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
rest-proxy         |    at java.lang.Thread.run(Thread.java:748)

Потребительские группы CLI

Я могу просмотреть группу потребителей в CLI, но у нее нет активных участников:

$ kafka-consumer-groups --bootstrap-server broker:9092 --list

имеет результат:

console-consumer-60695
console-consumer-62259
console-consumer-19307
console-consumer-47906
console-consumer-40838
rested

Тем не менее, когда я пытаюсь получить members:

$ kafka-consumer-groups --bootstrap-server localhost:29092 --group rest-consumer --describe --members

Consumer group 'rested' has no active members.

1 ответ

Решение

TL;DR

Вам нужно заключить ключ в двойные кавычки. Не потому, что все ключи должны быть заключены в кавычки, а с помощью синтаксического анализатора JSON вам нужно сделать свой ключ допустимым JSON, а строку, заключенную в двойные кавычки, - допустимым JSON.

Если вам действительно нужно обработать это сообщение, вам нужно прочитать его в другом формате, чем JSON.

Длинный ответ

У вас есть запись с ключом, который не имеет кавычек, что делает значение JSON недействительным, поэтому, когда анализатор JSON Jackson пытается проанализировать ключ, это недопустимый JSON (что не ясно из сообщения об ошибке, но когда он не видит цитата или квадратная или фигурная скобка, которая начинает предполагать, что это логическое значение или ноль).

Вы можете увидеть, где он захватывает их ключ и пытается расшифровать его как JSON здесь

https://github.com/confluentinc/kafka-rest/blob/a9b7cc527a26fdf09db27d148f2e71bfe3d87a6a/kafka-rest/src/main/java/io/confluent/kafkarest/v2/JsonKafkaConsumerState.java#L69

Я смог воспроизвести вашу ошибку, используя этот метод

curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
      --data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "latest"}' \
      http://localhost:8082/consumers/my_json_consumer

curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["testjsontopic"]}' \
 http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription


./bin/kafka-console-producer \
  --broker-list :9092 \
  --topic testjsontopic \
  --property parse.key=true \
  --property key.separator="&"

>"key"&{"foo":"bar"}

*Ctrl-C

curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
      http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records

На данный момент я могу прочитать запись, но когда я добавляю ключ без кавычек, я получаю ту же ошибку, что и вы

./bin/kafka-console-producer \
  --broker-list :9092 \
  --topic testjsontopic \
  --property parse.key=true \
  --property key.separator="&"

>key&{"foo":"bar"}

Теперь, когда я называю этот код

curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
      http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records

Теперь я получаю эту ошибку

com.fasterxml.jackson.core.JsonParseException: нераспознанный токен "ключ": ожидалось ("истина", "ложь" или "ноль")

Используйте это, чтобы прочитать ваши темы также

./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic testjsontopic --property print.key=true --from-beginning
Другие вопросы по тегам