Как проверить макет (moto/boto) S3 для чтения / записи в PySpark

Я пытаюсь протестировать функцию, которая записывает данные на S3, а затем считывает те же данные из того же места S3. Я пытаюсь использовать moto а также boto (2.x) для достижения этого [1]. Проблема в том, что служба возвращает мне запрещенный доступ к ключу [2]. Аналогичная проблема (даже если сообщение об ошибке немного отличается) сообщается в репозитории moto github [3], но она еще не решена.

Кто-нибудь когда-нибудь успешно тестировал mocked s3 для чтения / записи в PySpark, чтобы поделиться своими мыслями?

[1]

import boto
from boto.s3.key import Key
from moto import mock_s3

_test_bucket = 'test-bucket'
_test_key = 'data.csv'

@pytest.fixture(scope='function')
def spark_context(request):
    conf = SparkConf().setMaster("local[2]").setAppName("pytest-pyspark-local-testing")
    sc = SparkContext(conf=conf)
    sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", 'test-access-key-id')
    sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", 'test-secret-access-key')
    request.addfinalizer(lambda: sc.stop())
    quiet_py4j(sc)
    return sc

spark_test = pytest.mark.usefixtures("spark_context")

@spark_test
@mock_s3
def test_tsv_read_from_and_write_to_s3(spark_context):
    spark = SQLContext(spark_context)

    s3_conn = boto.connect_s3()
    s3_bucket = s3_conn.create_bucket(_test_bucket)
    k = Key(s3_bucket)
    k.key = _test_key 
    k.set_contents_from_string('')    

    s3_uri = 's3n://{}/{}'.format(_test_bucket, _test_key)
    df = (spark
          .read
          .csv(s3_uri))

[2]

(...)
E py4j.protocol.Py4JJavaError: An error occurred while calling o33.csv.
E : org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: S3 HEAD request failed for '/data.csv' - ResponseCode=403, ResponseMessage=Forbidden
(...)

[3] https://github.com/spulec/moto/issues/1543

1 ответ

moto — это библиотека, которая используется для имитации ресурсов aws.

1. Создайте ресурс:

Если вы попытаетесь получить доступ к несуществующей корзине S3, aws вернет Forbidden error.

Обычно нам нужны эти ресурсы, созданные еще до запуска наших тестов. Итак, создайте фикстуру pytest сустановлен в True

      import pytest
import boto3
from moto import mock_s3

@pytest.fixture(autouse=True)
def fixture_mock_s3():
    with mock_s3():
        conn = boto3.resource('s3', region_name='us-east-1')
        conn.create_bucket(Bucket='MYBUCKET') # an empty test bucket is created
        yield

  • Приведенный выше код создает фиктивное ведро s3 с именем «MUBUCKET». Ведро пусто.
  • Имя ведра должно быть таким же, как и у исходного ведра.
  • с autouse, прибор автоматически доступен для всех тестов.
  • Вы можете уверенно запускать тесты, так как ваши тесты не будут иметь доступа к оригинальной корзине.

2. Определите и запустите тесты, связанные с ресурсом:

Предположим, у вас есть код, который записывает файл в корзину S3.

      def write_to_s3(filepath: str):
    s3 = boto3.resource('s3', region_name='us-east-1')    
    s3.Bucket('MYBUCKET').upload_file(filepath, 'A/B/C/P/data.txt')

Это можно проверить следующим образом:

      from botocore.errorfactory import ClientError

def test_write_to_s3():
    dummy_file_path = f"{TEST_DIR}/data/dummy_data.txt"
    # The s3 bucket is created by the fixture and not lies empty
    # test for emptiness
    s3 = boto3.resource('s3', region_name='us-east-1')
    bucket = s3.Bucket("MYBUCKET")
    objects = list(bucket.objects.filter(Prefix="/"))
    assert objects == []
    # Now, lets write a file to s3
    write_to_s3(dummy_file_path)
    # the below assert statement doesn't throw any error
    assert s3.head_object(Bucket='MYBUCKET', Key='A/B/C/P/data.txt')
Другие вопросы по тегам