Фильтрация фрейма данных pyspark, если текстовый столбец содержит слова в указанном списке

Я видел опубликованные здесь вопросы, похожие на мои, но я все еще получаю ошибки в своем коде, когда пытаюсь ответить на некоторые из них. У меня есть датафрейм с тремя столбцами - создан _at, текст и слова (это просто токенизированная версия текста). Увидеть ниже:

введите описание изображения здесь

Теперь у меня есть список компаний ['Starbucks', 'Nvidia', 'IBM', 'Dell'], и я хочу сохранить только те строки, где текст включает в себя эти слова выше.

Я попробовал несколько вещей, но безуспешно:

small_DF.filter(lambda x: any(word in x.text for word in test_list))

Возвращает: TypeError: условие должно быть строкой или столбцом

Я пытался создать функцию и использовать foreach():

def filters(line):
   return(any(word in line for word in test_list))
df = df.foreach(filters)

Это превращает DF в "Nonetype"

И последний, который я попробовал:

df = df.filter((col("text").isin(test_list))

Это возвращает пустой фрейм данных, что приятно, так как я не получаю ошибки, но, очевидно, не то, что я хочу.

2 ответа

Решение

Я думаю filter не работает, потому что он ожидает логического вывода от лямбда-функции и isin просто сравнивается с колонкой. Вы пытаетесь сравнить список слов со списком слов. Вот то, что я попробовал, может дать вам некоторое направление -

# prepare some test data ==> 

words = [x.lower() for x in ['starbucks', 'Nvidia', 'IBM', 'Dell']]
data = [['i love Starbucks'],['dell laptops rocks'],['help me I am stuck!']]
df = spark.createDataFrame(data).toDF('text')


from pyspark.sql.types import *

def intersect(row):
    # convert each word in lowecase
    row = [x.lower() for x in row.split()]
    return True if set(row).intersection(set(words)) else False


filterUDF = udf(intersect,BooleanType())
df.where(filterUDF(df.text)).show()

выход:

+------------------+
|              text|
+------------------+
|  i love Starbucks|
|dell laptops rocks|
+------------------+

Ваш .filter возвращает ошибку, потому что это функция фильтра sql (ожидая BooleanType() столбец) на фреймах данных, а не функция фильтра на СДР. Если вы хотите использовать RDD, просто добавьте .rdd:

small_DF.rdd.filter(lambda x: any(word in x.text for word in test_list))

Вам не нужно использовать UDF, вы можете использовать регулярные выражения в pyspark с .rlike на вашей колонке "text":

from pyspark.sql import HiveContext
hc = HiveContext(sc)
import pyspark.sql.functions as psf

words = [x.lower() for x in ['starbucks', 'Nvidia', 'IBM', 'Dell']]
data = [['i love Starbucks'],['dell laptops rocks'],['help me I am stuck!']]
df = hc.createDataFrame(data).toDF('text')
df.filter(psf.lower(df.text).rlike('|'.join(words)))
Другие вопросы по тегам