PySpark 2 - объединить записи из нескольких строк
У меня есть текстовый файл, который имеет следующие записи:
<BR>Datetime:2018.06.30^
Name:ABC^
Se:4^
Machine:XXXXXXX^
InnerTrace:^
AdditionalInfo:^
<ER>
<BR>Datetime:2018.05.30-EDT^
Name:DEF^
Se:4^
Machine:XXXXXXX^
InnerTrace:^
AdditionalInfo:^
<ER>
Я пытаюсь прочитать это в искру и обработать файл так, чтобы результат был:
Datetime Name Se Machine InnerTrace AdditionalInfo
2018.06.30 ABC 4 XXXXXXX
2018.05.30 DEF 4 XXXXXXX
Когда я пытаюсь прочитать файл в искру с
sparkSession.read.csv("filename")
Я получаю каждую строку как отдельную, что затрудняет объединение всех строк между
и
Я делаю это с помощью PySpark 2.
1 ответ
Решение
Этот формат файла не подходит для искры. Если вы не можете изменить файл, вам придется сделать немало обработки, чтобы получить его так, как вы хотите.
Вот один подход, который может работать для вас
Читать файл
Предположим, у вас есть следующий DataFrame:
df = spark.read.csv(path="filename", quote='')
df.show(truncate=False)
#+-----------------------------+
#|_c0 |
#+-----------------------------+
#|"<BR>Datetime:2018.06.30^ |
#|Name:ABC^ |
#|Se:4^ |
#|Machine:XXXXXXX^ |
#|InnerTrace:^ |
#|AdditionalInfo:^ |
#|<ER>" |
#|"<BR>Datetime:2018.05.30-EDT^|
#|Name:DEF^ |
#|Se:4^ |
#|Machine:XXXXXXX^ |
#|InnerTrace:^ |
#|AdditionalInfo:^ |
#|<ER>" |
#+-----------------------------+
Добавить столбец для разделения строк на группы записей
import pyspark.sql.functions as f
from pyspark.sql import Window
w = Window.orderBy("id").rangeBetween(Window.unboundedPreceding, 0)
df = df.withColumn("group", f.col("_c0").rlike('^"<BR>.+').cast("int"))
df = df.withColumn("id", f.monotonically_increasing_id())
df = df.withColumn("group", f.sum("group").over(w)).drop("id")
df.show(truncate=False)
#+-----------------------------+-----+
#|_c0 |group|
#+-----------------------------+-----+
#|"<BR>Datetime:2018.06.30^ |1 |
#|Name:ABC^ |1 |
#|Se:4^ |1 |
#|Machine:XXXXXXX^ |1 |
#|InnerTrace:^ |1 |
#|AdditionalInfo:^ |1 |
#|<ER>" |1 |
#|"<BR>Datetime:2018.05.30-EDT^|2 |
#|Name:DEF^ |2 |
#|Se:4^ |2 |
#|Machine:XXXXXXX^ |2 |
#|InnerTrace:^ |2 |
#|AdditionalInfo:^ |2 |
#|<ER>" |2 |
#+-----------------------------+-----+
Используйте Regex для очистки и разделения строк
df = df.select(
"group",
f.regexp_replace(pattern=r'(^"<BR>|<ER>"$|\^$)', replacement='', str="_c0").alias("col")
).where(f.col("col") != '')
df = df.select("group", f.split("col", ":").alias("split"))
df.show(truncate=False)
#+-----+--------------------------+
#|group|split |
#+-----+--------------------------+
#|1 |[Datetime, 2018.06.30] |
#|1 |[Name, ABC] |
#|1 |[Se, 4] |
#|1 |[Machine, XXXXXXX] |
#|1 |[InnerTrace, ] |
#|1 |[AdditionalInfo, ] |
#|2 |[Datetime, 2018.05.30-EDT]|
#|2 |[Name, DEF] |
#|2 |[Se, 4] |
#|2 |[Machine, XXXXXXX] |
#|2 |[InnerTrace, ] |
#|2 |[AdditionalInfo, ] |
#+-----+--------------------------+
Извлечение элементов из массива, группы и сводки
df = df.select(
"group",
f.col("split").getItem(0).alias("key"),
f.col("split").getItem(1).alias("value")
)\
.groupBy("group").pivot("key").agg(f.first("value"))\
.drop("group")
df.show(truncate=False)
#+--------------+--------------+----------+-------+----+---+
#|AdditionalInfo|Datetime |InnerTrace|Machine|Name|Se |
#+--------------+--------------+----------+-------+----+---+
#| |2018.06.30 | |XXXXXXX|ABC |4 |
#| |2018.05.30-EDT| |XXXXXXX|DEF |4 |
#+--------------+--------------+----------+-------+----+---+