Эффективность сходства строк в Apache Spark
Мы новички в Apache Spark и выполняем сопоставление строк, используя методы сходства строк (JaroWinkler, Levenshtein, Cosine), мы должны сделать это для огромных данных (2,2 миллиона), хотели бы знать, является ли это правильным способом или же мы может сделать это еще более эффективным в любом случае?
package org.sparkexample;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import info.debatty.java.stringsimilarity.JaroWinkler;
import info.debatty.java.stringsimilarity.Levenshtein;
public class StringSimilarity {
public static void main(String[] args) {
System.setProperty("hadoop.home.dir", "C:\\winutils");
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("JoinFunctions").setMaster("local[*]"));
SQLContext sqlContext = new SQLContext(sc);
SparkSession spark = SparkSession.builder()
.appName("StringSimiliarityExample").getOrCreate();
JaroWinkler jw = new JaroWinkler();
Levenshtein Levenshtein1=new Levenshtein();
Dataset<Row> sourceFileContent = sqlContext.read()
.format("com.databricks.spark.csv")
.option("header", "true")
.load("Source.csv");
Dataset<Row> targetFileContent = sqlContext.read()
.format("com.databricks.spark.csv")
.option("header", "true")
.load("Target.csv");
List<Row> targetFileContentList = targetFileContent.collectAsList();
for(Row individaulRow : targetFileContentList){
productDescriptionSource=productDescriptionTarget=brandSource=brandTarget=manufacturerIDSource=manufacturerIDTarget="";
productDescriptionSourceIndex=individaulRow.fieldIndex("productDescription");
productDescriptionTargetIndex=individaulRow.fieldIndex("productDescriptionTarget");
manufacturerIDSourceIndex=individaulRow.fieldIndex("Manufacturer part Number");
manufacturerIDTargetIndex=individaulRow.fieldIndex("Manufacturer part NumberTarget");
if(!individaulRow.isNullAt(productDescriptionSourceIndex)){
productDescriptionSource=individaulRow.get(productDescriptionSourceIndex).toString().toLowerCase().replaceAll("[^a-zA-Z0-9\\s+]", "");
}
if(!individaulRow.isNullAt(productDescriptionTargetIndex)){
productDescriptionTarget=individaulRow.get(productDescriptionTargetIndex).toString().toLowerCase().replaceAll("[^a-zA-Z0-9\\s+]", "");
}
if(!individaulRow.isNullAt(manufacturerIDSourceIndex)){
manufacturerIDSource=individaulRow.get(manufacturerIDSourceIndex).toString().toLowerCase().replaceAll("[^a-zA-Z0-9\\s+]", "");
}
if(!individaulRow.isNullAt(manufacturerIDTargetIndex)){
manufacturerIDTarget=individaulRow.get(manufacturerIDTargetIndex).toString().toLowerCase().replaceAll("[^a-zA-Z0-9\\s+]", "");
}
/* STRING SIMILARITY */
Row individualObject= (Row)RowFactory.create(
isMatchValue+" ",
"1:"+String.format("%.4f",jaroWinkler.similarity(productDescriptionSource, productDescriptionTarget)),
"2:"+String.format("%.4f",levenshtein.similarity(manufacturerIDSource, manufacturerIDTarget)),
);
trainSetData.add(individualObject);
}
Dataset<Row> machineLearningDataSet = sparkSession.createDataFrame(trainSetData, schemaTrainSetData);
}
}
Для приведенного выше кода требуется 30 минут для запуска 1 миллиона данных (автономная система).