Spark Структурированные потоковые сине-зеленые развертывания

Мы хотели бы иметь возможность развертывать наши задания Spark таким образом, чтобы не возникало простоев при обработке данных во время развертываний (в настоящее время у нас есть 2-3-минутное окно). На мой взгляд, самый простой способ сделать это - смоделировать философию "сине-зеленого развертывания", которая заключается в том, чтобы раскрутить новую версию задания Spark, дать ему прогреться, а затем закрыть старое задание. Однако со структурированной потоковой передачей и контрольными точками мы не можем этого сделать, потому что новое задание Spark обнаруживает, что последний файл контрольных точек уже существует (из старого задания). Я приложил пример ошибки ниже. У кого-нибудь есть мысли о возможном обходном пути?

Я думал о копировании существующего каталога контрольных точек в другой каталог контрольных точек для вновь созданного задания - хотя это должно работать как обходной путь (некоторые данные могут быть обработаны повторно, но наша БД должна дедуплицироваться), это кажется супер хакерским, и я бы предпочел не проводить.

Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: rename destination /user/checkpoint/job/offsets/3472939 already exists
    at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.validateOverwrite(FSDirRenameOp.java:520)
    at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.unprotectedRenameTo(FSDirRenameOp.java:364)
    at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameTo(FSDirRenameOp.java:282)
    at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameToInt(FSDirRenameOp.java:247)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renameTo(FSNamesystem.java:3677)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.rename2(NameNodeRpcServer.java:914)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.rename2(ClientNamenodeProtocolServerSideTranslatorPB.java:587)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)

    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
    at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
    at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1991)
    at org.apache.hadoop.fs.Hdfs.renameInternal(Hdfs.java:335)
    at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:678)
    at org.apache.hadoop.fs.FileContext.rename(FileContext.java:958)
    at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.rename(HDFSMetadataLog.scala:356)
    at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatch(HDFSMetadataLog.scala:160)
    ... 20 more
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.fs.FileAlreadyExistsException): rename destination /user/checkpoint/job/offsets/3472939 already exists

1 ответ

Это возможно, но это добавит сложности вашему приложению. Запуск потоков в целом быстрый, поэтому можно предположить, что задержка вызвана инициализацией статических объектов и зависимостей. В этом случае вам понадобится только SparkContext / SparkSessionи нет потоковых зависимостей, поэтому процесс можно описать так:

  • Запустите новое приложение Spark.
  • Инициализируйте пакетно-ориентированные объекты.
  • Передайте сообщение предыдущему приложению, чтобы уйти в отставку.
  • Ждите подтверждения.
  • Начать потоки.

На очень высоком уровне счастливый путь можно представить в виде:

введите описание изображения здесь

Поскольку это очень общий шаблон, его можно реализовать различными способами, в зависимости от языка и инфраструктуры:

  • Облегченная очередь сообщений типа ØMQ.
  • Передача сообщений через распределенную файловую систему.
  • Размещение приложений в интерактивном контексте (Apache Toree, Apache Livy) и использование внешнего клиента для оркестровки.
Другие вопросы по тегам