Объединяйте документы с помощью FlexibleSearch, создавайте пары многозначных значений, используя es-sparksql.
В настоящее время hadoopasticsearch hadform преобразует набор данных /rdd в документы с отображением 1 к 1, т.е. 1 строка в наборе данных преобразуется в один документ. В нашем сценарии мы делаем что-то вроде этого
для универа
PUT spark/docs/1
{
"_k":"one",
"_k":"two",
"_k":"three" // large sets , we dont need to store much, we just want to map multiple keys to single value.
"_v" :"key:
}
GET spark/docs/_search
{
"query" : {
"constant_score" : {
"filter" : {
"terms" : {
"_k" : ["one"] // all values work.
}
}
}
}
}
Любое предложение, как мы можем реализовать выше, если есть лучшая стратегия, пожалуйста, предложите.
Ниже код не работает, но я пытаюсь достичь чего-то вроде ниже в теории
final Dataset<String> df = spark.read().csv("src/main/resources/star2000.csv").select("_c1").dropDuplicates().as(Encoders.STRING());
final Dataset<ArrayList> arrayListDataset = df.mapPartitions(new MapPartitionsFunction<String, ArrayList>() {
@Override
public Iterator<ArrayList> call(Iterator<String> iterator) throws Exception {
ArrayList<String> s = new ArrayList<>();
iterator.forEachRemaining(it -> s.add(it));
return Iterators.singletonIterator(s);
}
}, Encoders.javaSerialization(ArrayList.class));
JavaEsSparkSQL.saveToEs(arrayListDataset,"spark/docs");
Я не хочу собирать полный набор данных в один список, так как это может привести к OOM, поэтому планируется получить список для каждого раздела и индексировать его по ключу раздела.
2 ответа
Используя pojo как
Document{
String[] vals,
String key
}
и с фрагментом кода ниже
Dataset<String> df = spark.sqlContext().read().parquet(params.getPath())
.select(params.getColumnName())
.as(Encoders.STRING());
final Dataset<Document> documents = df.coalesce(numPartitions).mapPartitions(iterator -> {
final Set<String> set = Sets.newHashSet(iterator);
Document d = new Document(set.toArray(new String[set.size()]),"key1");
return Iterators.singletonIterator(d);}, Encoders.bean(Document.class));
JavaEsSparkSQL.saveToEs(documents, params.getTableIndexName() + "/"+params.getTableIndexType());
Это создает выше индекс массива.
Это поможет опубликовать исходный код, который вы используете, также неясно, чего вы пытаетесь достичь.
Я предполагаю, что вы хотели бы разместить массив в поле ключа (_k) и другое значение в поле значения (_v)?
Таким образом, вы можете создать JavaPairRDD и сохранить его в Elasticsearch, как показано ниже:
String[] keys = {"one", "two", "three"};
String value = "key";
List<Tuple2<String[],String>> l = new ArrayList<Tuple2<String[],String>>();
l.add(new Tuple2<String[],String>(keys, value));
JavaPairRDD<String[],String> R = ctx.parallelizePairs(l);
JavaEsSpark.saveToEs(R,"index/type");