Источник данных из другого проекта
Я хочу запустить пакетное задание потока данных в проекте gcp A. Источником для конвейера является хранилище данных из другого проекта. Конвейер работает с DirectPipelineRunner, но когда я переключаюсь на DataflowPipelineRunner, я получаю ошибку: запрос не выполнен с кодом 403, НЕ будет повторяться: https://www.googleapis.com/datastore/v1beta2/datasets/projectb/runQuery. Как правильно это сделать?
Я добавил служебную учетную запись из проекта A в проект B. Также предоставлены учетные данные параметров из сертификата учетной записи службы.
Код трубопровода:
public class Sample {
public static void main(String[] args) throws Exception {
DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
//options.setRunner(DirectPipelineRunner.class);
options.setRunner(DataflowPipelineRunner.class);
options.setProject("project_a");
// Your Google Cloud Storage path for staging local files.
options.setStagingLocation("gs://project_a_folder/staging");
options.setGcpCredential(
DatastoreHelper.getServiceAccountCredential(
"project_a@developer.gserviceaccount.com",
SecurityUtils.loadPrivateKeyFromKeyStore(
SecurityUtils.getPkcs12KeyStore(),
Sample.class.getClass().getResourceAsStream(
"/projecta-0450c49cbddc.p12"),
"notasecret",
"privatekey",
"notasecret"),
Arrays.asList(
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/devstorage.full_control",
"https://www.googleapis.com/auth/userinfo.email",
"https://www.googleapis.com/auth/datastore")));
Pipeline pipeline = Pipeline.create(options);
DatastoreV1.Query.Builder q = DatastoreV1.Query.newBuilder();
q.addKindBuilder().setName("Entity");
q.setFilter(DatastoreHelper.makeFilter("property",
DatastoreV1.PropertyFilter.Operator.EQUAL,
DatastoreHelper.makeValue("somevalue")));
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("f1").setType("STRING"));
fields.add(new TableFieldSchema().setName("f2").setType("STRING"));
TableSchema tableSchema = new TableSchema().setFields(fields);
pipeline.apply(DatastoreIO.readFrom("projectb", q.build()))
.apply(ParDo.of(new DoFn<DatastoreV1.Entity, KV<String, String>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
try {
Map<String, DatastoreV1.Value> propertyMap = DatastoreHelper.getPropertyMap(c.element());
String p1 = DatastoreHelper.getString(propertyMap.get("p1"));
String p2 = DatastoreHelper.getString(propertyMap.get("p2"));
if (!Strings.isNullOrEmpty(p1)) {
c.output(KV.of(p2, p1));
}
} catch (Exception e) {
log.log(Level.SEVERE, "Failed to output entity data", e);
}
}
}))
.apply(ParDo.of(new DoFn<KV<String, String>, TableRow>() {
@Override
public void processElement(ProcessContext c) throws Exception {
TableRow tableRow = new TableRow();
tableRow.set("f1", c.element().getKey());
tableRow.set("f2", c.element().getValue());
c.output(tableRow);
}
}))
.apply(BigQueryIO.Write.to("dataset.table")
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withSchema(tableSchema));
pipeline.run();
}
}
1 ответ
Если учетная запись службы для проекта A добавлена в качестве администратора для проекта B, и для обоих проектов включен API Cloud Datastore, тогда должен работать межпроектный доступ к хранилищу данных из Dataflow.
Я не думаю, что вам нужно делать какие-либо учетные данные вручную, Dataflow должен автоматически запускаться как учетная запись службы для проекта А.