Получена неправильная запись для spark-executor-<groupid> <topic> 0 даже после попытки смещения <number>

Моя работа Spark вызывает исключение, как показано ниже:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.AssertionError: assertion failed: Got wrong record for spark-executor-test-local-npp_consumer_grp_3 <topic> 0 even after seeking to offset 29599
    at scala.Predef$.assert(Predef.scala:170)

Я отключил auto.commit (enable.auto.commit=false) и использовать Kafka API для фиксации смещения

((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges.get());`). 

В чем может быть причина такой ошибки? Эта ошибка возникает из-за проблемы со стороны потребителя Kafka или из-за моего spark-kafka потребительская программа?*

После просмотра в CachedKafkaConsumer Исходный код, я думаю, это должно быть из-за последовательного промаха буфера (мой размер буфера по умолчанию размер - 65536 - receive.buffer.bytes = 65536) но я не вижу сообщение об ошибке буфера - Buffer miss for $groupId $topic $partition $offset в моих журналах.

Итак, мне интересно, это связано с размером буфера?

Я пытался увеличить receive.buffer.bytes в 655360 все же мой spark-kafka потребитель потерпел неудачу с той же ошибкой Может ли эта ошибка быть из-за отправки моего источника Kafka из-за огромных данных?

2 ответа

У меня та же проблема, и я нашел следующий исходный код в классе CachedKafkaCounsumer от искривления. Это, очевидно, связано с тем, что смещение от опроса потребителей и смещение, к которому стремится потребитель, не равны.

Я воспроизвел эту проблему, и обнаружил, что смещение от одной темы AndPartition прерывисто в Кафке

def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset")
if (offset != nextOffset) {
  logInfo(s"Initial fetch for $groupId $topic $partition $offset")
  seek(offset)
  poll(timeout)
}

if (!buffer.hasNext()) { poll(timeout) }
assert(buffer.hasNext(),
  s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
var record = buffer.next()

if (record.offset != offset) {
  logInfo(s"Buffer miss for $groupId $topic $partition $offset")
  seek(offset)
  poll(timeout)
  assert(buffer.hasNext(),
    s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
  record = buffer.next()
  assert(record.offset == offset,
    s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset")
}

nextOffset = offset + 1
record
}

У меня была такая же проблема, когда я читал из темы, заполненной с использованием транзакционного производителя. Эта проблема была вызвана маркерами транзакций (commit/abort), которые не может прочитать spark-streaming-kafka. Когда вы запускаете SimpleConsumerShell с параметром --print-offsets в этом разделе, вы должны видеть "разрывы" между смещениями.

Единственное решение, которое я вижу сейчас, - это отключение транзакционного производителя, потому что более новый spark-streaming-kafka еще не реализован.

У меня тоже была эта проблема, и я наткнулся на эту ссылку: http://apache-spark-user-list.1001560.n3.nabble.com/quot-Got-wrong-record-after-seeking-to-offset-quot-issue-td30609.html

Эта проблема была решена в версии 2.4.0: https://issues.apache.org/jira/browse/SPARK-17147.

Я получал сообщения из сжатой темы (сжатой) и использовал версию 2.3.0 программы spark-streaming-kafka-0-10_2, которая не может работать со сжатием.

Перейдя к версии 2.4.0 программы spark-streaming-kafka-0-10_2, я смог ее решить: org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0

Еще мне нужно настроить: spark.streaming.kafka.allowNonConsecutiveOffsets= true

Моя команда отправки выглядит так:

spark-submit --class com.streamtest.Main --master spark: // myparkhost: 7077 --packages org.apache.spark: spark-streaming-kafka-0-10_2.11:2.4.0, org.apache. spark: spark-streaming_2.11:2.3.0, org.apache.spark: spark-core_2.11:2.3.0 --conf spark.streaming.kafka.allowNonConsecutiveOffsets= true / work / streamapp / build / libs / streamapp. банка

Другие вопросы по тегам