Нет файловой системы для схемы: потому что
Я пытаюсь подключиться к IBM Cloud Object Storage из IBM Data Science Experience:
access_key = 'XXX'
secret_key = 'XXX'
bucket = 'mybucket'
host = 'lon.ibmselect.objstor.com'
service = 'mycos'
sqlCxt = SQLContext(sc)
hconf = sc._jsc.hadoopConfiguration()
hconf.set('fs.cos.myCos.access.key', access_key)
hconf.set('fs.cos.myCos.endpoint', 'http://' + host)
hconf.set('fs.cose.myCos.secret.key', secret_key)
hconf.set('fs.cos.service.v2.signer.type', 'false')
obj = 'mydata.tsv.gz'
rdd = sc.textFile('cos://{0}.{1}/{2}'.format(bucket, service, obj))
print(rdd.count())
Это возвращает:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.io.IOException: No FileSystem for scheme: cos
Я предполагаю, что мне нужно использовать схему 'cos', основанную на документах Stocator. Тем не менее, ошибка предполагает, что Stocator не доступен или это старая версия?
Есть идеи?
Обновление 1:
Я также попробовал следующее:
sqlCxt = SQLContext(sc)
hconf = sc._jsc.hadoopConfiguration()
hconf.set('fs.cos.impl', 'com.ibm.stocator.fs.ObjectStoreFileSystem')
hconf.set('fs.stocator.scheme.list', 'cos')
hconf.set('fs.stocator.cos.impl', 'com.ibm.stocator.fs.cos.COSAPIClient')
hconf.set('fs.stocator.cos.scheme', 'cos')
hconf.set('fs.cos.mycos.access.key', access_key)
hconf.set('fs.cos.mycos.endpoint', 'http://' + host)
hconf.set('fs.cos.mycos.secret.key', secret_key)
hconf.set('fs.cos.service.v2.signer.type', 'false')
service = 'mycos'
obj = 'mydata.tsv.gz'
rdd = sc.textFile('cos://{0}.{1}/{2}'.format(bucket, service, obj))
print(rdd.count())
Однако на этот раз ответ был:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.io.IOException: No object store for: cos
at com.ibm.stocator.fs.ObjectStoreVisitor.getStoreClient(ObjectStoreVisitor.java:121)
...
Caused by: java.lang.ClassNotFoundException: com.ibm.stocator.fs.cos.COSAPIClient
3 ответа
Последняя версия Stocator (v1.0.9), которая поддерживает схему fs.cos, еще не развернута в Spark aaService (это будет в ближайшее время). Пожалуйста, используйте схему Stocator "fs.s3d" для подключения к вашей COS.
Пример:
endpoint = 'endpointXXX'
access_key = 'XXX'
secret_key = 'XXX'
prefix = "fs.s3d.service"
hconf = sc._jsc.hadoopConfiguration()
hconf.set(prefix + ".endpoint", endpoint)
hconf.set(prefix + ".access.key", access_key)
hconf.set(prefix + ".secret.key", secret_key)
bucket = 'mybucket'
obj = 'mydata.tsv.gz'
rdd = sc.textFile('s3d://{0}.service/{1}'.format(bucket, obj))
rdd.count()
Кроме того, вы можете использовать ibmos2spark. Библиотека уже установлена на нашем сервисе. Пример:
import ibmos2spark
credentials = {
'endpoint': 'endpointXXXX',
'access_key': 'XXXX',
'secret_key': 'XXXX'
}
configuration_name = 'os_configs' # any string you want
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name)
bucket = 'mybucket'
obj = 'mydata.tsv.gz'
rdd = sc.textFile(cos.url(obj, bucket))
rdd.count()
Похоже, что драйвер не инициализирован должным образом. Попробуйте эту конфигурацию:
hconf.set('fs.cos.impl', 'com.ibm.stocator.fs.ObjectStoreFileSystem')
hconf.set('fs.stocator.scheme.list', 'cos')
hconf.set('fs.stocator.cos.impl', 'com.ibm.stocator.fs.cos.COSAPIClient')
hconf.set('fs.stocator.cos.scheme', 'cos')
hconf.set('fs.cos.mycos.access.key', access_key)
hconf.set('fs.cos.mycos.endpoint', 'http://' + host)
hconf.set('fs.cos.mycos.secret.key', secret_key)
hconf.set('fs.cos.service.v2.signer.type', 'false')
ОБНОВЛЕНИЕ 1:
Вы также должны убедиться, что классы-стокаторы находятся на пути к классам. Вы можете использовать пакетные системы, запустив pyspark следующим образом:
./bin/pyspark --packages com.ibm.stocator:stocator:1.0.24
Это работает с swift2d
а также cos
схема.
ОБНОВЛЕНИЕ 2:
Просто следуйте документации Stocator ( https://github.com/CODAIT/stocator). Он содержит все детали, как его установить, какую ветку использовать и т.д.
Stocator находится на пути к классам для ядер Spark 2.0 и 2.1, но cos
схема не настроена. Вы можете получить доступ к конфигурации, выполнив следующее в записной книжке Python:
!cat $SPARK_CONF_DIR/core-site.xml
Искать недвижимость fs.stocator.scheme.list
, То, что я сейчас вижу, это:
<property>
<name>fs.stocator.scheme.list</name>
<value>swift2d,swift,s3d</value>
</property>
Я рекомендую вам подать запрос на функцию против DSX для поддержки cos
схема.
Я обнаружил ту же проблему и для ее решения просто изменил среду:
В IBM Watson Studio, если вы запустите записную книжку Jupyter в среде без предварительно настроенного искрового кластера, вы получите эту ошибку. УстановкаPySpark
недостаточно.
Вместо этого, если вы запустите записную книжку с доступным кластером Spark, все будет в порядке.
Вы должны установить
.config("spark.hadoop.fs.stocator.scheme.list", "cos")
вместе с некоторыми другими
fs.cos...
конфигурации. Вот пример кода сквозного фрагмента, который работает (протестирован с
pyspark==2.3.2
и
Python 3.7.3
):
from pyspark.sql import SparkSession
stocator_jar = '/path/to/stocator-1.1.2-SNAPSHOT-IBM-SDK.jar'
cos_instance_name = '<myCosIntanceName>'
bucket_name = '<bucketName>'
s3_region = '<region>'
cos_iam_api_key = '*******'
iam_servicce_id = 'crn:v1:bluemix:public:iam-identity::<****************>'
spark_builder = (
SparkSession
.builder
.appName('test_app'))
spark_builder.config('spark.driver.extraClassPath', stocator_jar)
spark_builder.config('spark.executor.extraClassPath', stocator_jar)
spark_builder.config(f"fs.cos.{cos_instance_name}.iam.api.key", cos_iam_api_key)
spark_builder.config(f"fs.cos.{cos_instance_name}.endpoint", f"s3.{s3_region}.cloud-object-storage.appdomain.cloud")
spark_builder.config(f"fs.cos.{cos_instance_name}.iam.service.id", iam_servicce_id)
spark_builder.config("spark.hadoop.fs.stocator.scheme.list", "cos")
spark_builder.config("spark.hadoop.fs.cos.impl", "com.ibm.stocator.fs.ObjectStoreFileSystem")
spark_builder.config("fs.stocator.cos.impl", "com.ibm.stocator.fs.cos.COSAPIClient")
spark_builder.config("fs.stocator.cos.scheme", "cos")
spark_sess = spark_builder.getOrCreate()
dataset = spark_sess.range(1, 10)
dataset = dataset.withColumnRenamed('id', 'user_idx')
dataset.repartition(1).write.csv(
f'cos://{bucket_name}.{cos_instance_name}/test.csv',
mode='overwrite',
header=True)
spark_sess.stop()
print('done!')