JSONParseException - чтение данных с помощью API отдыха Kafka
ТЕМА КАФКА (test3)
$ kafka-console-consumer --bootstrap-server broker:9092 --topic test3 --from-beginning
"Can we write to a topic that does not exist?"
"Can we write to a topic that does not exist?"
{"foo":"bar"}
["foo","bar"]
confluent
confluent
confluent
kafka
logs
0
0
Потребительский (кафка-отдых API на localhost:8082
)
- Создать потребителя
POST
запроситьhttp://localhost:8082/consumers/rested
Тело запроса:
{
"format": "json",
"auto.offset.reset": "earliest",
"auto.commit.enable": "false"
}
Тело ответа:
{
"instance_id": "rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff",
"base_uri": "http://rest-proxy:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff"
}
- Создать подписку usihg
POST
вhttp://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/subscription
используя заголовки:
Host: http://localhost:8082
Content-Type: application/vnd.kafka.v2+json
и тело запроса:
{
"topics": [
"test3"
]
}
возвращает ответ 204 No Content
,
- Читайте записи, сделав
GET
запроситьhttp://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/records
используя заголовки:
Host: http://localhost:8082
Accept: application/vnd.kafka.json.v2+json
возвращает ответ:
{
"error_code": 50002,
"message": "Kafka error: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')\n at [Source: (byte[])\"key\"; line: 1, column: 7]"
}
Как мы можем решить эту проблему и обеспечить получение данных?
Исключение (на Кафку)
Журнал запущенного прокси-сервера Kafka Rest имеет следующее исключение:
rest-proxy | [2018-12-31 03:09:27,232] INFO 172.25.0.1 - - [31/Dec/2018:03:09:26 +0000] "GET /consumers/rest-consumer/instances/rest-consumer-8e49873e-13ce-46a5-be1f-0237a0369efe/records HTTP/1.1" 500 211 341 (io.confluent.rest-utils.requests)
rest-proxy | [2018-12-31 03:09:27,235] ERROR Unexpected exception in consumer read task id=io.confluent.kafkarest.v2.KafkaConsumerReadTask@59611e28 (io.confluent.kafkarest.v2.KafkaConsumerReadTask)
rest-proxy | org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy | at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy | Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy | at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy | at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
rest-proxy | at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723)
rest-proxy | at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141)
rest-proxy | at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000)
rest-proxy | at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091)
rest-proxy | at io.confluent.kafkarest.v2.JsonKafkaConsumerState.deserialize(JsonKafkaConsumerState.java:79)
rest-proxy | at io.confluent.kafkarest.v2.JsonKafkaConsumerState.createConsumerRecord(JsonKafkaConsumerState.java:64)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerReadTask.maybeAddRecord(KafkaConsumerReadTask.java:158)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerReadTask.addRecords(KafkaConsumerReadTask.java:142)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerReadTask.doPartialRead(KafkaConsumerReadTask.java:99)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerManager$RunnableReadTask.run(KafkaConsumerManager.java:370)
rest-proxy | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
rest-proxy | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
rest-proxy | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
rest-proxy | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
rest-proxy | at java.lang.Thread.run(Thread.java:748)
Потребительские группы CLI
Я могу просмотреть группу потребителей в CLI, но у нее нет активных участников:
$ kafka-consumer-groups --bootstrap-server broker:9092 --list
имеет результат:
console-consumer-60695
console-consumer-62259
console-consumer-19307
console-consumer-47906
console-consumer-40838
rested
Тем не менее, когда я пытаюсь получить members
:
$ kafka-consumer-groups --bootstrap-server localhost:29092 --group rest-consumer --describe --members
Consumer group 'rested' has no active members.
1 ответ
TL;DR
Вам нужно заключить ключ в двойные кавычки. Не потому, что все ключи должны быть заключены в кавычки, а с помощью синтаксического анализатора JSON вам нужно сделать свой ключ допустимым JSON, а строку, заключенную в двойные кавычки, - допустимым JSON.
Если вам действительно нужно обработать это сообщение, вам нужно прочитать его в другом формате, чем JSON.
Длинный ответ
У вас есть запись с ключом, который не имеет кавычек, что делает значение JSON недействительным, поэтому, когда анализатор JSON Jackson пытается проанализировать ключ, это недопустимый JSON (что не ясно из сообщения об ошибке, но когда он не видит цитата или квадратная или фигурная скобка, которая начинает предполагать, что это логическое значение или ноль).
Вы можете увидеть, где он захватывает их ключ и пытается расшифровать его как JSON здесь
Я смог воспроизвести вашу ошибку, используя этот метод
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "latest"}' \
http://localhost:8082/consumers/my_json_consumer
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["testjsontopic"]}' \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
./bin/kafka-console-producer \
--broker-list :9092 \
--topic testjsontopic \
--property parse.key=true \
--property key.separator="&"
>"key"&{"foo":"bar"}
*Ctrl-C
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
На данный момент я могу прочитать запись, но когда я добавляю ключ без кавычек, я получаю ту же ошибку, что и вы
./bin/kafka-console-producer \
--broker-list :9092 \
--topic testjsontopic \
--property parse.key=true \
--property key.separator="&"
>key&{"foo":"bar"}
Теперь, когда я называю этот код
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
Теперь я получаю эту ошибку
com.fasterxml.jackson.core.JsonParseException: нераспознанный токен "ключ": ожидалось ("истина", "ложь" или "ноль")
Используйте это, чтобы прочитать ваши темы также
./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic testjsontopic --property print.key=true --from-beginning