Использование Spanner в потоке данных Apache Beam
Я пытаюсь добавить соединение Spanner в Apache Beam ParDo(DoFn). Мне нужно искать некоторые строки как часть ParDo. В потоке данных создается несколько рабочих (обычно не более 4), и я использую методы startBundle и finishBundle, чтобы открывать и закрывать соединения гаечного ключа на протяжении всей жизни рабочих. Затем в методе processElement я выполняю поиск для каждого элемента, передающего DatabaseClient и использующего singleUseReadOnlyTransaction.
Я должен добавить, что это работает как поток данных под GCP
Некоторый код, чтобы проиллюстрировать это.
private static CustomDoFn<String, TransactionImport> processRow = new CustomDoFn<String, TransactionImport>(){
private static final long serialVersionUID = 1L;
private Spanner spanner = null;
private DatabaseClient dbClient = null;
@StartBundle
public void startBundle(StartBundleContext c){
TransactionFileOptions options = c.getPipelineOptions().as(TransactionFileOptions.class);
com.google.cloud.spanner.SpannerOptions spannerOptions = com.google.cloud.spanner.SpannerOptions.newBuilder().build();
spanner = spannerOptions.getService();
String spannerProjectID = options.getSpannerProjectId();
String spannerInstanceID = options.getSpannerInstanceId();
String spannerDatabaseID = options.getSpannerDatabaseId();
DatabaseId db = DatabaseId.of(spannerProjectID, spannerInstanceID, spannerDatabaseID);
dbClient = spanner.getDatabaseClient(db);
}
@FinishBundle
public void finishBundle(FinishBundleContext c){
spanner.close();
}
@ProcessElement
public void processElement(DoFn<String, TransactionImport>.ProcessContext c) throws Exception {
TransactionImport import = new TransactionImport();
Statement statement = Statement.newBuilder("SELECT * FROM Table1 WHERE Name= @Name")
.bind("Name").to( text)
.build();
ResultSet resultSet = dbClient.singleUseReadOnlyTransaction().executeQuery(statement);
// set some value on import dependant on retrieved value
c.output(import);
}
Это всегда приводит к тому, что поток данных не завершается, и когда я проверяю журнал, я вижу:
Processing stuck in step Process Rows for at least 05m00s without outputting or completing in state process
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:458)
at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
at java.util.concurrent.SynchronousQueue.take(SynchronousQueue.java:924)
at com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly(Uninterruptibles.java:233)
at com.google.cloud.spanner.SessionPool$Waiter.take(SessionPool.java:411)
at com.google.cloud.spanner.SessionPool$Waiter.access$3300(SessionPool.java:399)
at com.google.cloud.spanner.SessionPool.getReadSession(SessionPool.java:754)
at com.google.cloud.spanner.DatabaseClientImpl.singleUseReadOnlyTransaction(DatabaseClientImpl.java:52)
at com.mycompany.pt.SpannerDataAccess.getBinDetails(SpannerDataAccess.java:197)
at com.mycompany.pt.transactionFiles.TransactionFileDataflow$1.processLine(TransactionFileDataflow.java:411)
at com.mycompany.pt.transactionFiles.TransactionFileDataflow$1.processElement(TransactionFileDataflow.java:336)
at com.mycompany.pt.transactionFiles.TransactionFileDataflow$1$DoFnInvoker.invokeProcessElement(Unknown Source)
`
У кого-нибудь есть опыт использования Spanner в ParDo?
1 ответ
Я не эксперт по гаечным ключам, но, возможно, я могу помочь:
Вы должны использовать @Setup/@Teardown для подключения и отключения от гаечного ключа. @{Start,Finish}Bundle вызывается несколько раз в течение жизни работника. Смотрите здесь для получения более подробной информации: https://beam.apache.org/documentation/execution-model/
Выдает ли ваш метод processElement элемент с помощью
c.output(...)
? Если нет, луч будет думать, что ваш трубопровод застрял