Google Pub/Sub to Dataflow, избегайте дублирования с ID записи
Я пытаюсь создать потоковое задание потока данных, которое читает события из Pub/Sub и записывает их в BigQuery.
Согласно документации, Dataflow может обнаруживать доставку дублированных сообщений, если используется идентификатор записи (см.: https://cloud.google.com/dataflow/model/pubsub-io).
Но даже используя этот идентификатор записи, у меня все еще есть дубликаты (около 0,0002%).
Я что-то пропустил?
РЕДАКТИРОВАТЬ:
Я использую Spotify Async PubSub Client для публикации сообщений со следующим фрагментом:
Message
.builder()
.data(new String(Base64.encodeBase64(json.getBytes())))
.attributes("myid", id, "mytimestamp", timestamp.toString)
.build()
Затем я использую Spotify scio, чтобы прочитать сообщение из pub/sub и сохранить его в DataFlow:
val input = sc.withName("ReadFromSubscription")
.pubsubSubscription(subscriptionName, "myid", "mytimestamp")
input
.withName("FixedWindow")
.withFixedWindows(windowSize) // apply windowing logic
.toWindowed // convert to WindowedSCollection
//
.withName("ParseJson")
.map { wv =>
wv.copy(value = TableRow(
"message_id" -> (Json.parse(wv.value) \ "id").as[String],
"message" -> wv.value)
)
}
//
.toSCollection // convert back to normal SCollection
//
.withName("SaveToBigQuery")
.saveAsBigQuery(bigQueryTable(opts), BQ_SCHEMA, WriteDisposition.WRITE_APPEND)
Размер окна составляет 1 минуту.
После нескольких секунд ввода сообщений у меня уже есть дубликаты в BigQuery.
Я использую этот запрос для подсчета дубликатов:
SELECT
COUNT(message_id) AS TOTAL,
COUNT(DISTINCT message_id) AS DISTINCT_TOTAL
FROM my_dataset.my_table
//returning 273666 273564
И этот, чтобы посмотреть на них:
SELECT *
FROM my_dataset.my_table
WHERE message_id IN (
SELECT message_id
FROM my_dataset.my_table
GROUP BY message_id
HAVING COUNT(*) > 1
) ORDER BY message_id
//returning for instance:
row|id | processed_at | processed_at_epoch
1 00166a5c-9143-3b9e-92c6-aab52601b0be 2017-02-02 14:06:50 UTC 1486044410367 { ...json1... }
2 00166a5c-9143-3b9e-92c6-aab52601b0be 2017-02-02 14:06:50 UTC 1486044410368 { ...json1... }
3 00354cc4-4794-3878-8762-f8784187c843 2017-02-02 13:59:33 UTC 1486043973907 { ...json2... }
4 00354cc4-4794-3878-8762-f8784187c843 2017-02-02 13:59:33 UTC 1486043973741 { ...json2... }
5 0047284e-0e89-3d57-b04d-ebe4c673cc1a 2017-02-02 14:09:10 UTC 1486044550489 { ...json3... }
6 0047284e-0e89-3d57-b04d-ebe4c673cc1a 2017-02-02 14:08:52 UTC 1486044532680 { ...json3... }
1 ответ
В документации BigQuery говорится, что могут быть редкие случаи, когда появляются дубликаты:
- "BigQuery запоминает этот идентификатор как минимум в течение одной минуты" - если поток данных занимает более одной минуты перед повторной попыткой вставки, BigQuery может разрешить дублирование. Вы можете просмотреть журналы из конвейера, чтобы определить, так ли это,
- "В редких случаях, когда центр обработки данных Google неожиданно теряет связь, автоматическая дедупликация может быть невозможна".
Вы можете попробовать инструкции по удалению дубликатов вручную. Это также позволит вам увидеть insertID
который использовался с каждой строкой, чтобы определить, была ли проблема на стороне потока данных (генерируя разные insertID
s для той же записи) или на стороне BigQuery (не удается дедуплицировать строки на основе их insertID
).