Тип данных Pandas datetime64[ns] не работает в Hive/Athena
Я работаю над приложением python, которое просто конвертирует CSV-файл в формат паркет, совместимый с hive / athena, и для этого использую библиотеки fastparquet и pandas. В CSV-файле есть значения меток времени, например 2018-12-21 23:45:00
который должен быть записан как timestamp
введите в паркет файл. Ниже мой код, который работает,
columnNames = ["contentid","processed_time","access_time"]
dtypes = {'contentid': 'str'}
dateCols = ['access_time', 'processed_time']
s3 = boto3.client('s3')
obj = s3.get_object(Bucket=bucketname, Key=keyname)
df = pd.read_csv(io.BytesIO(obj['Body'].read()), compression='gzip', header=0, sep=',', quotechar='"', names = columnNames, error_bad_lines=False, dtype=dtypes, parse_dates=dateCols)
s3filesys = s3fs.S3FileSystem()
myopen = s3filesys.open
write('outfile.snappy.parquet', df, compression='SNAPPY', open_with=myopen,file_scheme='hive',partition_on=PARTITION_KEYS)
код успешно запущен, ниже - фрейм данных, созданный пандами
contentid object
processed_time datetime64[ns]
access_time datetime64[ns]
И наконец, когда я запросил файл паркета в Hive и Athena, значение временной метки +50942-11-30 14:00:00.000
вместо 2018-12-21 23:45:00
Любая помощь высоко ценится
6 ответов
Я знаю, что это старый вопрос, но он все еще актуален.
Как упоминалось ранее, Athena поддерживает только int96 в качестве меток времени. Используя fastparquet, можно создать файл паркета с правильным форматом для Athena. Важная часть - это times='int96', так как оно сообщает fastparquet преобразовать дату и время pandas в метку времени int96.
from fastparquet import write
import pandas as pd
def write_parquet():
df = pd.read_csv('some.csv')
write('/tmp/outfile.parquet', df, compression='GZIP', times='int96')
Вы можете попробовать:
dataframe.to_parquet(file_path, compression=None, engine='pyarrow', allow_truncated_timestamps=True, use_deprecated_int96_timestamps=True)
Решил проблему таким способом.
преобразует серию df с помощью метода to_datetime
затем с помощью .dt accesor выберите часть даты в datetime64[ns]
Пример:
df.field = pd.to_datetime(df.field)
df.field = df.field.dt.date
После этого Афина распознает данные
Кажется, проблема в Афине, она поддерживает только int96, а когда вы создаете отметку времени в пандах, это int64.
мой столбец данных, который содержит строковую дату, является "sdate", я сначала преобразовываю в метку времени
# add a new column w/ timestamp
df["ndate"] = pandas.to_datetime["sdate"]
# convert the timestamp to microseconds
df["ndate"] = pandas.to_datetime(["ndate"], unit='us')
# Then I convert my dataframe to pyarrow
table = pyarrow.Table.from_pandas(df, preserve_index=False)
# After that when writing to parquet add the coerce_timestamps and
# use_deprecated_int96_timstamps. (Also writing to S3 directly)
OUTBUCKET="my_s3_bucket"
pyarrow.parquet.write_to_dataset(table, root_path='s3://{0}/logs'.format(OUTBUCKET), partition_cols=['date'], filesystem=s3, coerce_timestamps='us', use_deprecated_int96_timestamps=True)
Я тоже сталкивался с этой проблемой несколько раз. Мой код ошибки: я установил индекс в формат даты и времени:
df.set_index(pd.DatetimeIndex(df.index), inplace=True)
Когда я затем прочитаю файл паркета с помощью fastparquet, он может заметить, что
OutOfBoundsDatetime: Out of bounds nanosecond timestamp: 219968-03-28 05:07:11
Однако ее можно легко решить, используя, а не
fastparquet.ParquetFile(path_file).to_pandas()
ПОЖАЛУЙСТА, ИСПОЛЬЗУЙТЕ
pd.read_parquet(path_file)
ЧТОБЫ РЕШИТЬ ЭТУ ПРОБЛЕМУ
Это мое решение, и оно работает хорошо, надеюсь, оно поможет вам, тогда вам не нужно беспокоиться о том, как писать паркет и каким образом.
Я столкнулся с той же проблемой, после долгих исследований, она решена сейчас.
когда вы делаете
write('outfile.snappy.parquet', df, compression='SNAPPY', open_with=myopen,file_scheme='hive',partition_on=PARTITION_KEYS)
он использует fastparquet за сценой, который использует другую кодировку для DateTime, чем то, с чем совместима Athena.
Решение: удалите fastparquet и установите pyarrow
- pip uninstall fastparquet
- pip install pyarrow
запустите ваш код снова. Это должно сработать на этот раз.:)