Тип данных Pandas datetime64[ns] не работает в Hive/Athena

Я работаю над приложением python, которое просто конвертирует CSV-файл в формат паркет, совместимый с hive / athena, и для этого использую библиотеки fastparquet и pandas. В CSV-файле есть значения меток времени, например 2018-12-21 23:45:00 который должен быть записан как timestamp введите в паркет файл. Ниже мой код, который работает,

columnNames = ["contentid","processed_time","access_time"]

dtypes = {'contentid': 'str'}

dateCols = ['access_time', 'processed_time']

s3 = boto3.client('s3')

obj = s3.get_object(Bucket=bucketname, Key=keyname)

df = pd.read_csv(io.BytesIO(obj['Body'].read()), compression='gzip', header=0, sep=',', quotechar='"', names = columnNames, error_bad_lines=False, dtype=dtypes, parse_dates=dateCols)

s3filesys = s3fs.S3FileSystem()

myopen = s3filesys.open

write('outfile.snappy.parquet', df, compression='SNAPPY', open_with=myopen,file_scheme='hive',partition_on=PARTITION_KEYS)

код успешно запущен, ниже - фрейм данных, созданный пандами

contentid                 object
processed_time            datetime64[ns]
access_time               datetime64[ns]

И наконец, когда я запросил файл паркета в Hive и Athena, значение временной метки +50942-11-30 14:00:00.000 вместо 2018-12-21 23:45:00

Любая помощь высоко ценится

6 ответов

Я знаю, что это старый вопрос, но он все еще актуален.

Как упоминалось ранее, Athena поддерживает только int96 в качестве меток времени. Используя fastparquet, можно создать файл паркета с правильным форматом для Athena. Важная часть - это times='int96', так как оно сообщает fastparquet преобразовать дату и время pandas в метку времени int96.

from fastparquet import write
import pandas as pd

def write_parquet():
  df = pd.read_csv('some.csv')
  write('/tmp/outfile.parquet', df, compression='GZIP', times='int96')

Вы можете попробовать:

dataframe.to_parquet(file_path, compression=None, engine='pyarrow', allow_truncated_timestamps=True, use_deprecated_int96_timestamps=True)

Решил проблему таким способом.

преобразует серию df с помощью метода to_datetime

затем с помощью .dt accesor выберите часть даты в datetime64[ns]

Пример:

df.field = pd.to_datetime(df.field)
df.field = df.field.dt.date

После этого Афина распознает данные

Кажется, проблема в Афине, она поддерживает только int96, а когда вы создаете отметку времени в пандах, это int64.

мой столбец данных, который содержит строковую дату, является "sdate", я сначала преобразовываю в метку времени

# add a new column w/ timestamp
df["ndate"] = pandas.to_datetime["sdate"]
# convert the timestamp to microseconds
df["ndate"] = pandas.to_datetime(["ndate"], unit='us')

# Then I convert my dataframe to pyarrow
table = pyarrow.Table.from_pandas(df, preserve_index=False)

# After that when writing to parquet add the coerce_timestamps and 
# use_deprecated_int96_timstamps. (Also writing to S3 directly)
OUTBUCKET="my_s3_bucket"

pyarrow.parquet.write_to_dataset(table, root_path='s3://{0}/logs'.format(OUTBUCKET), partition_cols=['date'], filesystem=s3, coerce_timestamps='us', use_deprecated_int96_timestamps=True)

Я тоже сталкивался с этой проблемой несколько раз. Мой код ошибки: я установил индекс в формат даты и времени:

      df.set_index(pd.DatetimeIndex(df.index), inplace=True)

Когда я затем прочитаю файл паркета с помощью fastparquet, он может заметить, что

      OutOfBoundsDatetime: Out of bounds nanosecond timestamp: 219968-03-28 05:07:11

Однако ее можно легко решить, используя, а не fastparquet.ParquetFile(path_file).to_pandas()

ПОЖАЛУЙСТА, ИСПОЛЬЗУЙТЕ pd.read_parquet(path_file)ЧТОБЫ РЕШИТЬ ЭТУ ПРОБЛЕМУ

Это мое решение, и оно работает хорошо, надеюсь, оно поможет вам, тогда вам не нужно беспокоиться о том, как писать паркет и каким образом.

Я столкнулся с той же проблемой, после долгих исследований, она решена сейчас.

когда вы делаете

write('outfile.snappy.parquet', df, compression='SNAPPY', open_with=myopen,file_scheme='hive',partition_on=PARTITION_KEYS)

он использует fastparquet за сценой, который использует другую кодировку для DateTime, чем то, с чем совместима Athena.

Решение: удалите fastparquet и установите pyarrow

  • pip uninstall fastparquet
  • pip install pyarrow

запустите ваш код снова. Это должно сработать на этот раз.:)

Другие вопросы по тегам