Как извлечь файлы .zst в фреймворк pandas

Я немного новичок, когда дело доходит до Python, но один из моих школьных проектов требует, чтобы я выполнял алгоритмы классификации на этом наборе данных о популярности Reddit. Файлы представляют собой огромные файлы.zst, и их можно найти здесь: https://files.pushshift.io/reddit/submissions/ любом случае, я просто не уверен, как извлечь это в базу данных, поскольку задания, которые у нас были до сих пор просто использовал наборы данных.csv, которые я мог легко поместить в фреймворк pandas. Я наткнулся на другой пост и попытался использовать код:

    def transform_zst_file(self,infile):
        zst_num_bytes = 2**22
        lines_read = 0
        dctx = zstd.ZstdDecompressor()
        with dctx.stream_reader(infile) as reader:
            previous_line = ""
            while True:
                chunk = reader.read(zst_num_bytes)
                if not chunk:
                    break
                string_data = chunk.decode('utf-8')
                lines = string_data.split("\n")
                for i, line in enumerate(lines[:-1]):
                    if i == 0:
                        line = previous_line + line
                    self.appendData(line, self.type)
                    lines_read += 1
                    if self.max_lines_to_read and lines_read >= self.max_lines_to_read:
                        return
                previous_line = lines[-1]

Но я не совсем уверен, как поместить это в фрейм данных pandas или поместить только определенный процент точек данных в фрейм данных, если файл слишком велик. Любая помощь будет очень признательна!

Следующий код приводит к сбою моего компьютера каждый раз, когда я пытаюсь его запустить:

import zstandard as zstd  
your_filename = "..." 
with open(your_filename, "rb") as f:     
    data = f.read()  

dctx = zstd.ZstdDecompressor() 
decompressed = dctx.decompress(data)

Может быть, из-за того, что размер файла слишком велик, есть ли способ извлечь только процент этого файла в фрейм данных pandas?

6 ответов

Файл был сжат с помощью Zstandard (https://github.com/facebook/zstd), библиотеки сжатия.

Скорее всего, проще всего будет установить python-zstandard (https://pypi.org/project/zstandard/), используя

pip install zstandard

а затем в скрипте python запустите что-то вроде

import zstandard as zstd

your_filename = "..."
with open(your_filename, "rb") as f:
    data = f.read()

dctx = zstd.ZstdDecompressor()
decompressed = dctx.decompress(data)

Теперь вы можете использовать распакованные данные напрямую или записать их в какой-нибудь файл, а затем загрузить в pandas. Удачи!

В отличие от ответа Бимбы, он не считывает все в память, пока работает с каждой строкой. Это полезно, если вы работаете со сжатыми данными, разделенными новой строкой, объем которых превышает объем доступной памяти.

      import io
import zstandard as zstd
from pathlib import Path
import json

DCTX = zstd.ZstdDecompressor(max_window_size=2**31)

def read_lines_from_zst_file(zstd_file_path:Path):
    with (
        zstd.open(zstd_file_path, mode='rb', dctx=DCTX) as zfh,
        io.TextIOWrapper(zfh) as iofh
    ):
        for line in iofh:
            yield line       

if __name__ == "__main__":
    file = Path('some_zstd_file.zst')
    records = map(json.loads, read_lines_from_zst_file(file))
    for record in records:
        print(record.get('some-field'))
        
        

Начиная с версии 1.4 Pandas может распаковывать Zstandard (). До этого была встроенная поддержка сжатия '.gz', '.bz2', '.zip' и '.xz'.

Если файл заканчивается на.zstсуффикс pandas по умолчанию предполагает сжатие и может читать в файле.

      import pandas
df = pandas.read_csv('my_file.csv.zst')
# Being equivalent to
#   df = pandas.read_csv('my_file.csv.zst', compression='zstd')
# for files ending with .zst

Подробнее см. в документации Pandas read_csv .

Я использовал TextIOWrapper из модуля io.

      with open(file_name, 'rb') as fh:
    dctx = zstandard.ZstdDecompressor(max_window_size=2147483648)
    stream_reader = dctx.stream_reader(fh)
    text_stream = io.TextIOWrapper(stream_reader, encoding='utf-8')
    for line in text_stream:
        obj = json.loads(line)
        # HANDLE OBJECT LOGIC HERE

Я наткнулся на похожий набор данных Reddit, состоящий из zstсвалки. Чтобы перебрать содержимое вашего zst-файла, я использовал следующий код, который вы могли запустить как скрипт:

      import zstandard
import os
import json
import sys
from datetime import datetime
import logging.handlers


log = logging.getLogger("bot")
log.setLevel(logging.DEBUG)
log.addHandler(logging.StreamHandler())


def read_lines_zst(file_name):
    with open(file_name, 'rb') as file_handle:
        buffer = ''
        reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
        while True:
            chunk = reader.read(2**27).decode()
            if not chunk:
                break
            lines = (buffer + chunk).split("\n")

            for line in lines[:-1]:
                yield line, file_handle.tell()

            buffer = lines[-1]
        reader.close()


if __name__ == "__main__":
    file_path = sys.argv[1]
    file_size = os.stat(file_path).st_size
    file_lines = 0
    file_bytes_processed = 0
    created = None
    field = "subreddit"
    value = "wallstreetbets"
    bad_lines = 0
    try:
        for line, file_bytes_processed in read_lines_zst(file_path):
            try:
                obj = json.loads(line)
                created = datetime.utcfromtimestamp(int(obj['created_utc']))
                temp = obj[field] == value
            except (KeyError, json.JSONDecodeError) as err:
                bad_lines += 1
            file_lines += 1
            if file_lines % 100000 == 0:
                log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {bad_lines:,} : {(file_bytes_processed / file_size) * 100:.0f}%")
    except Exception as err:
        log.info(err)

    log.info(f"Complete : {file_lines:,} : {bad_lines:,}")

Могут быть более простые способы добиться этого, но для преобразования zst из дампов набора данных Reddit в действительный файл json с помощью Python я в конечном итоге использую:

      import zstandard as zstd

zst = '/path/to/file.zst'
with open(zst, "rb") as f:
    data = f.read()

dctx = zstd.ZstdDecompressor()
decompressed = dctx.decompress(data, max_output_size=1000000000) # 1GB
with open("/path/to/file.json", "w+") as f:
    f.write("[" + decompressed.decode("utf-8").strip().replace("\n", ",") + "]" )

Прочтите json-файл:

      import json

with open("/path/to/file.json") as f:
    data = json.load(f)
for d in data:
    print(d)

И на помощь всегда приходит скрипт bash, который кажется проще (не забудьте установить zstd и jq):

      set -euxo pipefail
cat "/path/to/file.zst" | zstd -d | jq --compact-output '.created_utc = (.created_utc | tonumber)' > "/path/to/file.json"
Другие вопросы по тегам