Я получаю эту ошибку. Получение SEVERE Channel ManagedChannelImpl{logId=1, target=bigquerystorage.googleapis.com:443} не было завершено должным образом
Я создал сценарий луча для получения данных из kafka и отправки их в BigQuery с помощью Apache Beam. На данный момент я использую java-direct-runner, и мне просто нужно отправить данные в свой большой запрос.
Это мой код: -
package com.knoldus.section8;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient;
import com.google.common.collect.Lists;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.LongDeserializer;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
public class KafkaStreaming {
static GoogleCredentials authExplicit(String jsonPath) throws IOException {
// You can specify a credential file by providing a path to GoogleCredentials.
// Otherwise credentials are read from the GOOGLE_APPLICATION_CREDENTIALS environment variable.
GoogleCredentials credentials = GoogleCredentials.fromStream(new FileInputStream(jsonPath))
.createScoped(Lists.newArrayList("https://www.googleapis.com/auth/cloud-platform"));
return credentials;
}
public static void main(String[] args) throws IOException {
PipelineOptions options = PipelineOptionsFactory.create();
GcpOptions gcpOptions = options.as(GcpOptions.class);
gcpOptions.setProject("excellent-guard-314111");
gcpOptions.setGcpTempLocation("./");
System.out.println(gcpOptions.getGcpCredential());
gcpOptions.setGcpCredential(
authExplicit(
"excellent-guard-314111-01f257a67f01.json"));
Pipeline p = Pipeline.create(options);
ArrayList<TableFieldSchema> columns = new ArrayList<>();
columns.add(new TableFieldSchema().setName("deviceID").setType("STRING"));
columns.add(new TableFieldSchema().setName("name").setType("STRING"));
columns.add(new TableFieldSchema().setName("description").setType("STRING"));
columns.add(new TableFieldSchema().setName("eventtime").setType("LONG"));
columns.add(new TableFieldSchema().setName("temperature").setType("DOUBLE"));
columns.add(new TableFieldSchema().setName("unit").setType("STRING"));
TableSchema tblSchema = new TableSchema().setFields(columns);
PCollection<IotEvent> iotEventPCollection = p.apply(KafkaIO.<Long, IotEvent>read().withBootstrapServers("localhost:9092").withTopic("test-new").withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(IotDes.class)
.withoutMetadata()).apply(Values.<IotEvent>create()).
apply(ParDo.of(new DoFn<IotEvent, IotEvent>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println(c.element().getDeviceID());
c.output(c.element());
}
}));
PCollection<TableRow> rowData =
iotEventPCollection.apply(
ParDo.of(
new DoFn<IotEvent, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) throws IOException {
IotEvent event = c.element();
TableRow row = new TableRow();
assert event != null;
row.set("deviceID", event.getDeviceID());
row.set("name", event.getName());
row.set("description", event.getDescription());
row.set("eventtime", event.getEventtime());
row.set("temperature", event.getTemperature());
row.set("unit", event.getUnit());
System.out.println(row.toPrettyString());
c.output(row);
}
}));
WriteResult writeResult = rowData.apply(BigQueryIO.writeTableRows().to("beam_poc.table_poc").withSchema(tblSchema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
p.run().waitUntilFinish();
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>Beam</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.29.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.29.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-runners-google-cloud-dataflow-java -->
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-runners-google-cloud-dataflow-java -->
<!-- <dependency>-->
<!-- <groupId>org.apache.beam</groupId>-->
<!-- <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>-->
<!-- <version>2.29.0</version>-->
<!-- </dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-kafka -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>2.29.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.cloud/google-cloud-bigquery -->
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-google-cloud-platform -->
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-google-cloud-platform -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>2.29.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.14.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.14.1</version>
</dependency>
<!-- Thanks for using https://jar-download.com -->
</dependencies>
</project>
Ошибка, которую я получаю: -
19 мая 2021 г., 21:21:45 io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueueSEVERE: ~~~ Channel ManagedChannelImpl{logId=1, target=bigquerystorage.googleapis.com:443} неправильно завершил работу !!! ~ ~ ~ Обязательно вызовите shutdown () / shutdownNow () и дождитесь, пока awaitTermination () вернет true. java.lang.RuntimeException: сайт распределения ManagedChannel
2 ответа
Было выпущено исправление (https://issues.apache.org/jira/browse/BEAM-12356), и эта ошибка больше не должна возникать, пока используется apache beam версии 2.31.0 и выше.
У меня была та же проблема, и я смог решить ее, перейдя на Apache Beam SDK 2.28.0 (см. Этот пост )