PySpark 2 - объединить записи из нескольких строк

У меня есть текстовый файл, который имеет следующие записи:

<BR>Datetime:2018.06.30^
Name:ABC^
Se:4^
Machine:XXXXXXX^
InnerTrace:^
AdditionalInfo:^
<ER>
<BR>Datetime:2018.05.30-EDT^
Name:DEF^
Se:4^
Machine:XXXXXXX^
InnerTrace:^
AdditionalInfo:^
<ER>

Я пытаюсь прочитать это в искру и обработать файл так, чтобы результат был:

Datetime     Name  Se  Machine  InnerTrace  AdditionalInfo
2018.06.30   ABC   4   XXXXXXX      
2018.05.30   DEF   4   XXXXXXX

Когда я пытаюсь прочитать файл в искру с

sparkSession.read.csv("filename") 

Я получаю каждую строку как отдельную, что затрудняет объединение всех строк между
и . Есть ли простой способ обойти это?

Я делаю это с помощью PySpark 2.

1 ответ

Решение

Этот формат файла не подходит для искры. Если вы не можете изменить файл, вам придется сделать немало обработки, чтобы получить его так, как вы хотите.

Вот один подход, который может работать для вас

Читать файл

Предположим, у вас есть следующий DataFrame:

df = spark.read.csv(path="filename", quote='')
df.show(truncate=False)
#+-----------------------------+
#|_c0                          |
#+-----------------------------+
#|"<BR>Datetime:2018.06.30^    |
#|Name:ABC^                    |
#|Se:4^                        |
#|Machine:XXXXXXX^             |
#|InnerTrace:^                 |
#|AdditionalInfo:^             |
#|<ER>"                        |
#|"<BR>Datetime:2018.05.30-EDT^|
#|Name:DEF^                    |
#|Se:4^                        |
#|Machine:XXXXXXX^             |
#|InnerTrace:^                 |
#|AdditionalInfo:^             |
#|<ER>"                        |
#+-----------------------------+

Добавить столбец для разделения строк на группы записей

import pyspark.sql.functions as f
from pyspark.sql import Window

w = Window.orderBy("id").rangeBetween(Window.unboundedPreceding, 0)
df = df.withColumn("group", f.col("_c0").rlike('^"<BR>.+').cast("int"))
df = df.withColumn("id", f.monotonically_increasing_id())
df = df.withColumn("group", f.sum("group").over(w)).drop("id")
df.show(truncate=False)
#+-----------------------------+-----+
#|_c0                          |group|
#+-----------------------------+-----+
#|"<BR>Datetime:2018.06.30^    |1    |
#|Name:ABC^                    |1    |
#|Se:4^                        |1    |
#|Machine:XXXXXXX^             |1    |
#|InnerTrace:^                 |1    |
#|AdditionalInfo:^             |1    |
#|<ER>"                        |1    |
#|"<BR>Datetime:2018.05.30-EDT^|2    |
#|Name:DEF^                    |2    |
#|Se:4^                        |2    |
#|Machine:XXXXXXX^             |2    |
#|InnerTrace:^                 |2    |
#|AdditionalInfo:^             |2    |
#|<ER>"                        |2    |
#+-----------------------------+-----+

Используйте Regex для очистки и разделения строк

df = df.select(
    "group",
    f.regexp_replace(pattern=r'(^"<BR>|<ER>"$|\^$)', replacement='', str="_c0").alias("col")
).where(f.col("col") != '')
df = df.select("group", f.split("col", ":").alias("split"))

df.show(truncate=False)
#+-----+--------------------------+
#|group|split                     |
#+-----+--------------------------+
#|1    |[Datetime, 2018.06.30]    |
#|1    |[Name, ABC]               |
#|1    |[Se, 4]                   |
#|1    |[Machine, XXXXXXX]        |
#|1    |[InnerTrace, ]            |
#|1    |[AdditionalInfo, ]        |
#|2    |[Datetime, 2018.05.30-EDT]|
#|2    |[Name, DEF]               |
#|2    |[Se, 4]                   |
#|2    |[Machine, XXXXXXX]        |
#|2    |[InnerTrace, ]            |
#|2    |[AdditionalInfo, ]        |
#+-----+--------------------------+

Извлечение элементов из массива, группы и сводки

df = df.select(
        "group",
        f.col("split").getItem(0).alias("key"),
        f.col("split").getItem(1).alias("value")
    )\
    .groupBy("group").pivot("key").agg(f.first("value"))\
    .drop("group")
df.show(truncate=False)
#+--------------+--------------+----------+-------+----+---+
#|AdditionalInfo|Datetime      |InnerTrace|Machine|Name|Se |
#+--------------+--------------+----------+-------+----+---+
#|              |2018.06.30    |          |XXXXXXX|ABC |4  |
#|              |2018.05.30-EDT|          |XXXXXXX|DEF |4  |
#+--------------+--------------+----------+-------+----+---+
Другие вопросы по тегам