Ошибка использования SpannerIO в Apache Beam

Этот вопрос является продолжением этого. Я пытаюсь использовать Apache Beam для чтения данных из таблицы гаечного ключа Google (а затем выполнить некоторую обработку данных). Я написал следующий минимальный пример с использованием Java SDK:

package com.google.cloud.dataflow.examples;
import java.io.IOException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PCollection;
import com.google.cloud.spanner.Struct;

public class backup {

  public static void main(String[] args) throws IOException {
    PipelineOptions options = PipelineOptionsFactory.create();

    Pipeline p = Pipeline.create(options);
    PCollection<Struct> rows = p.apply(
            SpannerIO.read()
                .withInstanceId("my_instance")
                .withDatabaseId("my_db")
                .withQuery("SELECT t.table_name FROM information_schema.tables AS t")
                );

    PipelineResult result = p.run();
    try {
      result.waitUntilFinish();
    } catch (Exception exc) {
      result.cancel();
    }
  }
}

Когда я пытаюсь выполнить код с помощью DirectRunner, я получаю следующее сообщение об ошибке:

org.apache.beam.runners.direct.repackaged.com.google.common.util.concurrent.UncheckedExecutionException:

org.apache.beam.sdk.util.UserCodeException: java.lang.NoClassDefFoundError: Не удалось инициализировать класс com.google.cloud.spanner.spi.v1.SpannerErrorInterceptor

[...] Причина: org.apache.beam.sdk.util.UserCodeException: java.lang.NoClassDefFoundError: Не удалось инициализировать класс com.google.cloud.spanner.spi.v1.SpannerErrorInterceptor

[...] Причина: java.lang.NoClassDefFoundError: Не удалось инициализировать класс com.google.cloud.spanner.spi.v1.SpannerErrorInterceptor

Или, используя DataflowRunner:

org.apache.beam.runners.direct.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.beam.sdk.util.UserCodeException: java.lang.NoSuchFieldError: internal_static_google_rpc_ableccessMessess

[...] Вызывается: org.apache.beam.sdk.util.UserCodeException: java.lang.NoSuchFieldError: internal_static_google_rpc_LocalizedMessage_fieldAccessorTable

[...] Причины: java.lang.NoSuchFieldError: internal_static_google_rpc_LocalizedMessage_fieldAccessorTable

В обоих случаях сообщение об ошибке является довольно загадочным, и я не смог найти четкого представления о том, что является причиной ошибки из поиска Google. Я также не смог найти примеры сценариев, использующих модуль SpannerIO.

Это ошибка из-за явной ошибки в моем коде или из-за плохой установки облачных инструментов Google?

2 ответа

Скорее всего, эта проблема вызвана проблемой совместимости зависимостей, описанной здесь: BEAM-2837. Вот быстрый обходной путь, описанный в одном из комментариев к проблеме JIRA:

<dependency>
    <groupId>com.google.api.grpc</groupId>
    <artifactId>grpc-google-common-protos</artifactId>
    <version>0.1.9</version>
</dependency>

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
    <exclusions>
        <exclusion>
            <groupId>com.google.api.grpc</groupId>
            <artifactId>grpc-google-common-protos</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Явно определите необходимый com.google.api.grpc зависимость и исключить версию из org.apache.beam,

Вам необходимо указать ProjectID:

    SpannerIO.read()
            .withProjectId("my_project")
            .withInstanceId("my_instance")
            .withDatabaseId("my_db")

И вам нужно установить учетные данные для вашего проекта Spanner. Поскольку API SpannerIO не позволяет вам устанавливать какие-либо пользовательские учетные данные, вы должны установить глобальные учетные данные приложения, используя переменную среды GOOGLE_APPLICATION_CREDENTIALS.

Вы также можете читать (и писать) в Cloud Spanner, используя JDBC. Чтение делается так:

        PCollection<KV<String, Long>> words = p2.apply(JdbcIO.<KV<String, Long>> read()
            .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("nl.topicus.jdbc.CloudSpannerDriver",
                    "jdbc:cloudspanner://localhost;Project=my-project-id;Instance=instance-id;Database=database;PvtKeyPath=C:\\Users\\MyUserName\\Documents\\CloudSpannerKeys\\cloudspanner-key.json"))
            .withQuery("SELECT t.table_name FROM information_schema.tables AS t").withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianLongCoder.of()))
            .withRowMapper(new JdbcIO.RowMapper<KV<String, Long>>()
            {
                private static final long serialVersionUID = 1L;

                @Override
                public KV<String, Long> mapRow(ResultSet resultSet) throws Exception
                {
                    return KV.of(resultSet.getString(1), resultSet.getLong(2));
                }
            }));

Этот метод также позволяет использовать пользовательские учетные данные, устанавливая PvtKeyPath. Вы также можете написать в Google Cloud Spanner, используя JDBC. Посмотрите здесь пример: http://www.googlecloudspanner.com/2017/10/google-cloud-spanner-with-apache-beam.html

Другие вопросы по тегам