Google Pub/Sub to Dataflow, избегайте дублирования с ID записи

Я пытаюсь создать потоковое задание потока данных, которое читает события из Pub/Sub и записывает их в BigQuery.

Согласно документации, Dataflow может обнаруживать доставку дублированных сообщений, если используется идентификатор записи (см.: https://cloud.google.com/dataflow/model/pubsub-io).

Но даже используя этот идентификатор записи, у меня все еще есть дубликаты (около 0,0002%).

Я что-то пропустил?

РЕДАКТИРОВАТЬ:

Я использую Spotify Async PubSub Client для публикации сообщений со следующим фрагментом:

Message
      .builder()
      .data(new String(Base64.encodeBase64(json.getBytes())))
      .attributes("myid", id, "mytimestamp", timestamp.toString)
      .build()

Затем я использую Spotify scio, чтобы прочитать сообщение из pub/sub и сохранить его в DataFlow:

val input = sc.withName("ReadFromSubscription")
              .pubsubSubscription(subscriptionName, "myid", "mytimestamp")
input
    .withName("FixedWindow")
    .withFixedWindows(windowSize)  // apply windowing logic
    .toWindowed  // convert to WindowedSCollection
    //
    .withName("ParseJson")
    .map { wv =>
      wv.copy(value = TableRow(
        "message_id" -> (Json.parse(wv.value) \ "id").as[String],
        "message" -> wv.value)
      )
    }
    //
    .toSCollection  // convert back to normal SCollection
    //
    .withName("SaveToBigQuery")
    .saveAsBigQuery(bigQueryTable(opts), BQ_SCHEMA, WriteDisposition.WRITE_APPEND)

Размер окна составляет 1 минуту.

После нескольких секунд ввода сообщений у меня уже есть дубликаты в BigQuery.

Я использую этот запрос для подсчета дубликатов:

SELECT 
   COUNT(message_id) AS TOTAL, 
   COUNT(DISTINCT message_id) AS DISTINCT_TOTAL 
FROM my_dataset.my_table

//returning 273666  273564

И этот, чтобы посмотреть на них:

SELECT *
FROM my_dataset.my_table
WHERE message_id IN (
  SELECT message_id
  FROM my_dataset.my_table
  GROUP BY message_id
  HAVING COUNT(*) > 1
) ORDER BY message_id

//returning for instance:
row|id                                    | processed_at           | processed_at_epoch    
1   00166a5c-9143-3b9e-92c6-aab52601b0be    2017-02-02 14:06:50 UTC 1486044410367   { ...json1... }  
2   00166a5c-9143-3b9e-92c6-aab52601b0be    2017-02-02 14:06:50 UTC 1486044410368   { ...json1... }  
3   00354cc4-4794-3878-8762-f8784187c843    2017-02-02 13:59:33 UTC 1486043973907   { ...json2... }  
4   00354cc4-4794-3878-8762-f8784187c843    2017-02-02 13:59:33 UTC 1486043973741   { ...json2... } 
5   0047284e-0e89-3d57-b04d-ebe4c673cc1a    2017-02-02 14:09:10 UTC 1486044550489   { ...json3... } 
6   0047284e-0e89-3d57-b04d-ebe4c673cc1a    2017-02-02 14:08:52 UTC 1486044532680   { ...json3... }

1 ответ

В документации BigQuery говорится, что могут быть редкие случаи, когда появляются дубликаты:

  1. "BigQuery запоминает этот идентификатор как минимум в течение одной минуты" - если поток данных занимает более одной минуты перед повторной попыткой вставки, BigQuery может разрешить дублирование. Вы можете просмотреть журналы из конвейера, чтобы определить, так ли это,
  2. "В редких случаях, когда центр обработки данных Google неожиданно теряет связь, автоматическая дедупликация может быть невозможна".

Вы можете попробовать инструкции по удалению дубликатов вручную. Это также позволит вам увидеть insertID который использовался с каждой строкой, чтобы определить, была ли проблема на стороне потока данных (генерируя разные insertIDs для той же записи) или на стороне BigQuery (не удается дедуплицировать строки на основе их insertID).