Источник данных из другого проекта

Я хочу запустить пакетное задание потока данных в проекте gcp A. Источником для конвейера является хранилище данных из другого проекта. Конвейер работает с DirectPipelineRunner, но когда я переключаюсь на DataflowPipelineRunner, я получаю ошибку: запрос не выполнен с кодом 403, НЕ будет повторяться: https://www.googleapis.com/datastore/v1beta2/datasets/projectb/runQuery. Как правильно это сделать?

Я добавил служебную учетную запись из проекта A в проект B. Также предоставлены учетные данные параметров из сертификата учетной записи службы.

Код трубопровода:

public class Sample {
    public static void main(String[] args) throws Exception {
        DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
        //options.setRunner(DirectPipelineRunner.class);
        options.setRunner(DataflowPipelineRunner.class);
        options.setProject("project_a");
        // Your Google Cloud Storage path for staging local files.
        options.setStagingLocation("gs://project_a_folder/staging");
        options.setGcpCredential(
                DatastoreHelper.getServiceAccountCredential(
                    "project_a@developer.gserviceaccount.com",
                        SecurityUtils.loadPrivateKeyFromKeyStore(
                                SecurityUtils.getPkcs12KeyStore(),
                                Sample.class.getClass().getResourceAsStream(
                                        "/projecta-0450c49cbddc.p12"),
                                "notasecret",
                                "privatekey",
                                "notasecret"),
                        Arrays.asList(
                                "https://www.googleapis.com/auth/cloud-platform",
                                "https://www.googleapis.com/auth/devstorage.full_control",
                                "https://www.googleapis.com/auth/userinfo.email",
                                "https://www.googleapis.com/auth/datastore")));

        Pipeline pipeline = Pipeline.create(options);

        DatastoreV1.Query.Builder q = DatastoreV1.Query.newBuilder();
        q.addKindBuilder().setName("Entity");
        q.setFilter(DatastoreHelper.makeFilter("property",
                DatastoreV1.PropertyFilter.Operator.EQUAL,
                DatastoreHelper.makeValue("somevalue")));

        List<TableFieldSchema> fields = new ArrayList<>();
        fields.add(new TableFieldSchema().setName("f1").setType("STRING"));
        fields.add(new TableFieldSchema().setName("f2").setType("STRING"));
        TableSchema tableSchema = new TableSchema().setFields(fields);

        pipeline.apply(DatastoreIO.readFrom("projectb", q.build()))
                .apply(ParDo.of(new DoFn<DatastoreV1.Entity, KV<String, String>>() {
                    @Override
                    public void processElement(ProcessContext c) throws Exception {
                        try {
                            Map<String, DatastoreV1.Value> propertyMap = DatastoreHelper.getPropertyMap(c.element());
                            String p1 = DatastoreHelper.getString(propertyMap.get("p1"));
                            String p2 = DatastoreHelper.getString(propertyMap.get("p2"));
                            if (!Strings.isNullOrEmpty(p1)) {
                                c.output(KV.of(p2, p1));
                            }
                        } catch (Exception e) {
                            log.log(Level.SEVERE, "Failed to output entity data", e);
                        }
                    }
                }))
                .apply(ParDo.of(new DoFn<KV<String, String>, TableRow>() {
                    @Override
                    public void processElement(ProcessContext c) throws Exception {
                        TableRow tableRow = new TableRow();
                        tableRow.set("f1", c.element().getKey());
                        tableRow.set("f2", c.element().getValue());
                        c.output(tableRow);
                    }
                }))
                .apply(BigQueryIO.Write.to("dataset.table")
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                        .withSchema(tableSchema));

        pipeline.run();
    }
}

1 ответ

Если учетная запись службы для проекта A добавлена ​​в качестве администратора для проекта B, и для обоих проектов включен API Cloud Datastore, тогда должен работать межпроектный доступ к хранилищу данных из Dataflow.

Я не думаю, что вам нужно делать какие-либо учетные данные вручную, Dataflow должен автоматически запускаться как учетная запись службы для проекта А.

Другие вопросы по тегам