PySpark запрашивает имя одного столбца со значением, присутствующим в другом столбце

Input_pyspark_dataframe:

id   name  collection  student.1.price  student.2.price  student.3.price
111  aaa      1           100              999               232
222  bbb      2           200              888               656
333  ccc      1           300              777               454
444  ddd      1           400              666               787

output_pyspark_dataframe

id   name  collection    price  
111  aaa      1           100           
222  bbb      2           888            
333  ccc      1           300             
444  ddd      3           787       

мы можем найти правильную цену каждого идентификатора, используя значение, представленное в столбце коллекции

Вопрос

используя pyspark, как я могу найти правильную цену каждого идентификатора, динамически создавая имя столбца student.{collection}.price?

пожалуйста, дайте мне знать.

1 ответ

Немного закончено, но вы можете это сделать.

В fields предоставит вам имена полей структуры, student. Вы должны дать это вручную и в конечном итоге получите1, 2, 3.

Первая строка затем создаст массив столбцов student.{i}.price за i = range(1, 4). Точно так же вторая строка создаёт массив литералов{i}.

Теперь заархивируйте эти два массива в один массив, например

[('1', col('student.1.price')), ...]

и взорвите массив, тогда он станет:

('1', col('student.1.price'))
('2', col('student.2.price'))
('3', col('student.3.price'))

Поскольку arrays_zipдать вам массив структуры, приведенный выше результат - тип структуры. Получите каждое значение, используя ключ структуры в качестве столбца, то естьindex а также price.

Наконец, вы можете сравнить collection а также index (на самом деле это имя поля столбца структуры студента).

import pyspark.sql.functions as f

fields = [field.name for field in next(field for field in df.schema.fields if field.name == 'student').dataType.fields]

df.withColumn('array', f.array(*map(lambda x: 'student.' + x + '.price', fields))) \
  .withColumn('index', f.array(*map(lambda x: f.lit(x), fields))) \
  .withColumn('zip', f.arrays_zip('index', 'array')) \
  .withColumn('zip', f.explode('zip')) \
  .withColumn('index', f.col('zip.index')) \
  .withColumn('price', f.col('zip.array')) \
  .filter('collection = index') \
  .select('id', 'name', 'collection', 'price') \
  .show(10, False)

+---+----+----------+-----+
|id |name|collection|price|
+---+----+----------+-----+
|111|aaa |1         |100  |
|222|bbb |2         |888  |
|333|ccc |1         |300  |
|444|ddd |3         |787  |
+---+----+----------+-----+
Другие вопросы по тегам