Как распечатать путь решения / правила, используемые для прогнозирования выборки конкретной строки в PySpark?
Как распечатать путь решения конкретного образца в Spark DataFrame?
Spark Version: '2.3.1'
Приведенный ниже код печатает путь принятия решения всей модели, как заставить его распечатать путь принятия решения для конкретного образца? Например, путь принятия решения для строки, в которой тег tagvalue равен 2
import pyspark.sql.functions as F
from pyspark.ml import Pipeline, Transformer
from pyspark.sql import DataFrame
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler
import findspark
findspark.init()
from pyspark import SparkConf
from pyspark.sql import SparkSession
import pandas as pd
import pyspark.sql.functions as F
from pyspark.ml import Pipeline, Transformer
from pyspark.sql import DataFrame
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import monotonically_increasing_id, col, row_number
from pyspark.sql.window import Window
spark = SparkSession.builder.appName('demo')\
.master('local[*]')\
.getOrCreate()
data = pd.DataFrame({
'ball': [0, 1, 2, 3],
'keep': [4, 5, 6, 7],
'hall': [8, 9, 10, 11],
'fall': [12, 13, 14, 15],
'mall': [16, 17, 18, 10],
'label': [21, 31, 41, 51]
})
df = spark.createDataFrame(data)
df = df.withColumn("mono_ID", monotonically_increasing_id())
w = Window().orderBy("mono_ID")
df = df.select(row_number().over(w).alias("tagvalue"), col("*"))
assembler = VectorAssembler(
inputCols=['ball', 'keep', 'hall', 'fall'], outputCol='features')
dtc = DecisionTreeClassifier(featuresCol='features', labelCol='label')
pipeline = Pipeline(stages=[assembler, dtc]).fit(df)
transformed_pipeline = pipeline.transform(df)
#ml_pipeline = pipeline.stages[1]
result = transformed_pipeline.filter(transformed_pipeline.tagvalue == 2)
result.select('tagvalue', 'prediction').show()
+--------+----------+
|tagvalue|prediction|
+--------+----------+
| 2| 31.0|
+--------+----------+
Выше печатает prediction
тегового значения 2
, Теперь я хотел бы, чтобы путь решения в алгоритме привел к тому ответу этого значения тега, а не всей модели.
Я знаю следующее, но это печатает весь путь решения модели, а не конкретную модель.
ml_pipeline = pipeline.stages[1]
ml_pipeline.toDebugString
Эквивалент того, что существует в scikit learn, какова эквивалентность в искре?
Обновление 1:
Если вы запустите следующий код в scikit learn, он напечатает путь решения для этого конкретного примера, вот фрагмент прямо с сайта.
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
from sklearn.tree import DecisionTreeClassifier
iris = load_iris()
X = iris.data
y = iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)
estimator = DecisionTreeClassifier(max_leaf_nodes=3, random_state=0)
estimator.fit(X_train, y_train)
n_nodes = estimator.tree_.node_count
children_left = estimator.tree_.children_left
children_right = estimator.tree_.children_right
feature = estimator.tree_.feature
threshold = estimator.tree_.threshold
# First let's retrieve the decision path of each sample. The decision_path
# method allows to retrieve the node indicator functions. A non zero element of
# indicator matrix at the position (i, j) indicates that the sample i goes
# through the node j.
node_indicator = estimator.decision_path(X_test)
# Similarly, we can also have the leaves ids reached by each sample.
leave_id = estimator.apply(X_test)
# Now, it's possible to get the tests that were used to predict a sample or
# a group of samples. First, let's make it for the sample.
sample_id = 0
node_index = node_indicator.indices[node_indicator.indptr[sample_id]:
node_indicator.indptr[sample_id + 1]]
print('Rules used to predict sample %s: ' % sample_id)
for node_id in node_index:
if leave_id[sample_id] != node_id:
continue
if (X_test[sample_id, feature[node_id]] <= threshold[node_id]):
threshold_sign = "<="
else:
threshold_sign = ">"
print("decision id node %s : (X_test[%s, %s] (= %s) %s %s)" %
(node_id,
sample_id,
feature[node_id],
X_test[sample_id, feature[node_id]],
threshold_sign,
threshold[node_id]))
Выход будет такой
Правила, используемые для прогнозирования выборки 0: идентификатор узла принятия решения 4: (X_test[0, -2] (= 5.1) > -2.0)
1 ответ
Я немного изменил ваш фрейм данных, чтобы мы могли убедиться, что в объяснениях мы могли видеть различные функции
Я изменил Ассемблер, чтобы использовать feature_list, так что у нас будет легкий доступ к этому позже
изменения ниже:
#change1: ball goes from [0,1,2,3] ->[0,1,1,3] so we can see other features in explanations
#change2: added in multiple paths to the same prediction
#change3: added in a categorical variable
#change3: feature_list so we can re-use those indicies easily later
data = pd.DataFrame({
'ball': [0, 1, 1, 3, 1, 0, 1, 3],
'keep': [4, 5, 6, 7, 7, 4, 6, 7],
'hall': [8, 9, 10, 11, 2, 6, 10, 11],
'fall': [12, 13, 14, 15, 15, 12, 14, 15],
'mall': [16, 17, 18, 10, 10, 16, 18, 10],
'wall': ['a','a','a','a','a','a','c','e'],
'label': [21, 31, 41, 51, 51, 51, 21, 31]
})
df = spark.createDataFrame(data)
df = df.withColumn("mono_ID", monotonically_increasing_id())
w = Window().orderBy("mono_ID")
df = df.select(row_number().over(w).alias("tagvalue"), col("*"))
indexer = StringIndexer(inputCol='wall', outputCol='wallIndex')
encoder = OneHotEncoder(inputCol='wallIndex', outputCol='wallVec')
#i added this line so feature replacement later is easy because of the indices
features = ['ball','keep','wallVec','hall','fall']
assembler = VectorAssembler(
inputCols=features, outputCol='features')
dtc = DecisionTreeClassifier(featuresCol='features', labelCol='label')
pipeline = Pipeline(stages=[indexer, encoder, assembler, dtc]).fit(df)
transformed_pipeline = pipeline.transform(df)
Ниже приведен метод, позволяющий работать с деревом решений:
#get the pipeline back out, as you've done earlier, this changed to [3] because of the categorical encoders
ml_pipeline = pipeline.stages[3]
#saves the model so we can get at the internals that the scala code keeps private
ml_pipeline.save("mymodel_test")
#read back in the model parameters
modeldf = spark.read.parquet("mymodel_test/data/*")
import networkx as nx
#select only the columns that we NEED and collect into a list
noderows = modeldf.select("id","prediction","leftChild","rightChild","split").collect()
#create a graph for the decision tree; you Could use a simpler tree structure here if you wanted instead of a 'graph'
G = nx.Graph()
#first pass to add the nodes
for rw in noderows:
if rw['leftChild'] < 0 and rw['rightChild'] < 0:
G.add_node(rw['id'], cat="Prediction", predval=rw['prediction'])
else:
G.add_node(rw['id'], cat="splitter", featureIndex=rw['split']['featureIndex'], thresh=rw['split']['leftCategoriesOrThreshold'], leftChild=rw['leftChild'], rightChild=rw['rightChild'], numCat=rw['split']['numCategories'])
#second pass to add the relationships, now with additional information
for rw in modeldf.where("leftChild > 0 and rightChild > 0").collect():
tempnode = G.nodes()[rw['id']]
G.add_edge(rw['id'], rw['leftChild'], reason="{0} less than {1}".format(features[tempnode['featureIndex']],tempnode['thresh']))
G.add_edge(rw['id'], rw['rightChild'], reason="{0} greater than {1}".format(features[tempnode['featureIndex']],tempnode['thresh']))
Теперь давайте создадим функцию для работы со всем этим
Примечание: это может быть написано более чисто
#function to parse the path based on the tagvalue and it's corresponding features
def decision_path(tag2search):
wanted_row = transformed_pipeline.where("tagvalue = "+str(tag2search)).collect()[0]
wanted_features = wanted_row['features']
start_node = G.nodes()[0]
while start_node['cat'] != 'Prediction':
#do stuff with categorical variables
if start_node['numCat'] > 0:
feature_value = wanted_features[start_node['featureIndex']:start_node['featureIndex'] + start_node['numCat']]
#this assumes that you'll name all your cat variables with the following syntax 'ball' -> 'ballVec' or 'wall' -> 'wallVec'
feature_column = features[start_node['featureIndex']]
original_column = feature_column[:-3]
valToCheck = [x[original_column] for x in transformed_pipeline.select(feature_column, original_column).distinct().collect() if np.all(x[feature_column].toArray()==feature_value)][0]
if (valToCheck == wanted_row[original_column]) :
print("'{0}' value of {1} in [{2}]; ".format(original_column, wanted_row[original_column], valToCheck))
start_node = G.nodes()[start_node['leftChild']]
else:
print("'{0}' value of {1} in [{2}]; ".format(original_column, wanted_row[original_column], valToCheck))
start_node = G.nodes()[start_node['rightChild']]
#path to do stuff with non-categorical variables
else:
feature_value = wanted_features[start_node['featureIndex']]
if feature_value > start_node['thresh'][0]:
print("'{0}' value of {1} was greater than {2}; ".format(features[start_node['featureIndex']], feature_value, start_node['thresh'][0]))
start_node = G.nodes()[start_node['rightChild']]
else:
print("'{0}' value of {1} was less than or equal to {2}; ".format(features[start_node['featureIndex']], feature_value, start_node['thresh'][0]))
start_node = G.nodes()[start_node['leftChild']]
print("leads to prediction of {0}".format(start_node['predval']))
Результаты принимают эту форму:
[decision_path(X) for X in range(1,8)]
'fall' value of 8.0 was greater than 6.0;
'ball' value of 0.0 was less than or equal to 1.0;
'ball' value of 0.0 was less than or equal to 0.0;
leads to prediction of 21.0
'fall' value of 9.0 was greater than 6.0;
'ball' value of 1.0 was less than or equal to 1.0;
'ball' value of 1.0 was greater than 0.0;
'keep' value of 5.0 was less than or equal to 5.0;
leads to prediction of 31.0
'fall' value of 10.0 was greater than 6.0;
'ball' value of 1.0 was less than or equal to 1.0;
'ball' value of 1.0 was greater than 0.0;
'keep' value of 6.0 was greater than 5.0;
'wall' value of a in [a];
leads to prediction of 21.0
'fall' value of 11.0 was greater than 6.0;
'ball' value of 3.0 was greater than 1.0;
'wall' value of a in [a];
leads to prediction of 31.0
'fall' value of 2.0 was less than or equal to 6.0;
leads to prediction of 51.0
'fall' value of 6.0 was less than or equal to 6.0;
leads to prediction of 51.0
'fall' value of 10.0 was greater than 6.0;
'ball' value of 1.0 was less than or equal to 1.0;
'ball' value of 1.0 was greater than 0.0;
'keep' value of 6.0 was greater than 5.0;
'wall' value of c in [c];
leads to prediction of 21.0
Заметки:
- Если вы хотите остаться исключительно в Spark-мире, вы можете использовать GraphFrames вместо networkx (у меня нет такой роскоши:()
- Вы можете изменить формулировку, как вы хотите
- Если вам нужны impurity, impurityStats или gain, все они находятся в информационном фрейме данных модели, который сохраняется
- Я решил работать с деревом, а не анализировать
.toDebugString
потому что доступ к дереву звучит более основополагающим (и расширяемым)- На этой ноте, просто глядя на выходные данные.toDebugString И sklearn.decision_path, я чувствую, что они более понятны / читаемы
- если вы хотите визуализировать дерево, оформите заказ: https://github.com/tristaneljed/Decision-Tree-Visualization-Spark/blob/master/DT.py
- В какой-то момент я нашел чистую реализацию Scala, но сейчас не могу найти ее снова:(
- Я чувствую, что мне не хватает тестового случая с категорией "Не в", если кто-то хочет добавить, как будет выглядеть эта строка, я могу отредактировать, если мне нужно
Более эффективное и интерпретируемое решение с использованием атрибута todebugString дерева решений в pyspark выглядит следующим образом: Примечание. Если вам нужна подробная информация о приведенном ниже коде, проверьте https://medium.com/@dipaweshpawar/decoding-decision-tree-in-pyspark-bdd98dcd1ddf
from pyspark.sql.functions import to_date,datediff,lit,udf,sum,avg,col,count,lag
from pyspark.sql.types import StringType,LongType,StructType,StructField,DateType,IntegerType,DoubleType
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
import pandas as pd
from pyspark.sql import DataFrame
from pyspark.sql.functions import udf, lit, avg, max, min
from pyspark.sql.types import StringType, ArrayType, DoubleType
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
import operator
import ast
operators = {
">=": operator.ge,
"<=": operator.le,
">": operator.gt,
"<": operator.lt,
"==": operator.eq,
'and': operator.and_,
'or': operator.or_
}
data = pd.DataFrame({
'ball': [0, 1, 1, 3, 1, 0, 1, 3],
'keep': [4, 5, 6, 7, 7, 4, 6, 7],
'hall': [8, 9, 10, 11, 2, 6, 10, 11],
'fall': [12, 13, 14, 15, 15, 12, 14, 15],
'mall': [16, 17, 18, 10, 10, 16, 18, 10],
'label': [21, 31, 41, 51, 51, 51, 21, 31]
})
df = spark.createDataFrame(data)
f_list = ['ball','keep','mall','hall','fall']
assemble_numerical_features = VectorAssembler(inputCols=f_list, outputCol='features',
handleInvalid='skip')
dt = DecisionTreeClassifier(featuresCol='features', labelCol='label')
pipeline = Pipeline(stages=[assemble_numerical_features, dt])
model = pipeline.fit(df)
df = model.transform(df)
dt_m = model.stages[-1]
# Step 1: convert model.debugString output to dictionary of nodes and children
def parse_debug_string_lines(lines):
block = []
while lines:
if lines[0].startswith('If'):
bl = ' '.join(lines.pop(0).split()[1:]).replace('(', '').replace(')', '')
block.append({'name': bl, 'children': parse_debug_string_lines(lines)})
if lines[0].startswith('Else'):
be = ' '.join(lines.pop(0).split()[1:]).replace('(', '').replace(')', '')
block.append({'name': be, 'children': parse_debug_string_lines(lines)})
elif not lines[0].startswith(('If', 'Else')):
block2 = lines.pop(0)
block.append({'name': block2})
else:
break
return block
def debug_str_to_json(debug_string):
data = []
for line in debug_string.splitlines():
if line.strip():
line = line.strip()
data.append(line)
else:
break
if not line: break
json = {'name': 'Root', 'children': parse_debug_string_lines(data[1:])}
return json
# Step 2 : Using metadata stored in features column, build dictionary which maps each feature in features column of df to its index in feature vector
f_type_to_flist_dict = df.schema['features'].metadata["ml_attr"]["attrs"]
f_index_to_name_dict = {}
for f_type, f_list in f_type_to_flist_dict.items():
for f in f_list:
f_index = f['idx']
f_name = f['name']
f_index_to_name_dict[f_index] = f_name
def generate_explanations(dt_as_json, df:DataFrame, f_index_to_name_dict, operators):
dt_as_json_str = str(dt_as_json)
cond_parsing_exception_occured = False
df = df.withColumn('features'+'_list',
udf(lambda x: x.toArray().tolist(), ArrayType(DoubleType()))
(df['features'])
)
# step 3 : parse and check whether current instance follows condition in perticular node
def parse_validate_cond(cond: str, f_vector: list):
cond_parts = cond.split()
condition_f_index = int(cond_parts[1])
condition_op = cond_parts[2]
condition_value = float(cond_parts[3])
f_value = f_vector[condition_f_index]
f_name = f_index_to_name_dict[condition_f_index].replace('numerical_features_', '').replace('encoded_numeric_', '').lower()
if operators[condition_op](f_value, condition_value):
return True, f_name + ' ' + condition_op + ' ' + str(round(condition_value,2))
return False, ''
# Step 4 : extract rules for an instance in a dataframe, going through nodes in a tree where instance is satisfying the rule, finally leading to a prediction node
def extract_rule(dt_as_json_str: str, f_vector: list, rule=""):
# variable declared in outer function is read only
# in inner if not explicitly declared to be nonlocal
nonlocal cond_parsing_exception_occured
dt_as_json = ast.literal_eval(dt_as_json_str)
child_l = dt_as_json['children']
for child in child_l:
name = child['name'].strip()
if name.startswith('Predict:'):
# remove last comma
return rule[0:rule.rindex(',')]
if name.startswith('feature'):
try:
res, cond = parse_validate_cond(child['name'], f_vector)
except Exception as e:
res = False
cond_parsing_exception_occured = True
if res:
rule += cond +', '
rule = extract_rule(str(child), f_vector, rule=rule)
return rule
df = df.withColumn('explanation',
udf(lambda dt, fv:extract_rule(dt, fv) ,StringType())
(lit(dt_as_json_str), df['features'+'_list'])
)
# log exception occured while trying to parse
# condition in decision tree node
if cond_parsing_exception_occured:
print('some node in decision tree has unexpected format')
return df
df = generate_explanations(debug_str_to_json(dt_m.toDebugString), df, f_index_to_name_dict, operators)
rows = df.select(['ball','keep','mall','hall','fall','explanation','prediction']).collect()
output :
-----------------------
[Row(ball=0, keep=4, mall=16, hall=8, fall=12, explanation='hall > 7.0, mall > 13.0, ball <= 0.5', prediction=21.0),
Row(ball=1, keep=5, mall=17, hall=9, fall=13, explanation='hall > 7.0, mall > 13.0, ball > 0.5, keep <= 5.5', prediction=31.0),
Row(ball=1, keep=6, mall=18, hall=10, fall=14, explanation='hall > 7.0, mall > 13.0, ball > 0.5, keep > 5.5', prediction=21.0),
Row(ball=3, keep=7, mall=10, hall=11, fall=15, explanation='hall > 7.0, mall <= 13.0', prediction=31.0),
Row(ball=1, keep=7, mall=10, hall=2, fall=15, explanation='hall <= 7.0', prediction=51.0),
Row(ball=0, keep=4, mall=16, hall=6, fall=12, explanation='hall <= 7.0', prediction=51.0),
Row(ball=1, keep=6, mall=18, hall=10, fall=14, explanation='hall > 7.0, mall > 13.0, ball > 0.5, keep > 5.5', prediction=21.0),
Row(ball=3, keep=7, mall=10, hall=11, fall=15, explanation='hall > 7.0, mall <= 13.0', prediction=31.0)]
output of dt_m.toDebugString:
-----------------------------------
'DecisionTreeClassificationModel (uid=DecisionTreeClassifier_2a17ae7633b9) of depth 4 with 9 nodes\n If (feature 3 <= 7.0)\n Predict: 51.0\n Else (feature 3 > 7.0)\n If (feature 2 <= 13.0)\n Predict: 31.0\n Else (feature 2 > 13.0)\n If (feature 0 <= 0.5)\n Predict: 21.0\n Else (feature 0 > 0.5)\n If (feature 1 <= 5.5)\n Predict: 31.0\n Else (feature 1 > 5.5)\n Predict: 21.0\n'
output of debug_str_to_json(dt_m.toDebugString):
------------------------------------
{'name': 'Root',
'children': [{'name': 'feature 3 <= 7.0',
'children': [{'name': 'Predict: 51.0'}]},
{'name': 'feature 3 > 7.0',
'children': [{'name': 'feature 2 <= 13.0',
'children': [{'name': 'Predict: 31.0'}]},
{'name': 'feature 2 > 13.0',
'children': [{'name': 'feature 0 <= 0.5',
'children': [{'name': 'Predict: 21.0'}]},
{'name': 'feature 0 > 0.5',
'children': [{'name': 'feature 1 <= 5.5',
'children': [{'name': 'Predict: 31.0'}]},
{'name': 'feature 1 > 5.5',
'children': [{'name': 'Predict: 21.0'}]}]}]}]}]}