Сохранить Spark Dataframe в Elasticsearch - Не удается обработать исключение типа
Я разработал простую работу для чтения данных из MySQL и сохранения их в Elasticsearch с помощью Spark.
Вот код:
JavaSparkContext sc = new JavaSparkContext(
new SparkConf().setAppName("MySQLtoEs")
.set("es.index.auto.create", "true")
.set("es.nodes", "127.0.0.1:9200")
.set("es.mapping.id", "id")
.set("spark.serializer", KryoSerializer.class.getName()));
SQLContext sqlContext = new SQLContext(sc);
// Data source options
Map<String, String> options = new HashMap<>();
options.put("driver", MYSQL_DRIVER);
options.put("url", MYSQL_CONNECTION_URL);
options.put("dbtable", "OFFERS");
options.put("partitionColumn", "id");
options.put("lowerBound", "10001");
options.put("upperBound", "499999");
options.put("numPartitions", "10");
// Load MySQL query result as DataFrame
LOGGER.info("Loading DataFrame");
DataFrame jdbcDF = sqlContext.load("jdbc", options);
DataFrame df = jdbcDF.select("id", "title", "description",
"merchantId", "price", "keywords", "brandId", "categoryId");
df.show();
LOGGER.info("df.count : " + df.count());
EsSparkSQL.saveToEs(df, "offers/product");
Вы можете увидеть код очень просто. Он считывает данные в DataFrame, выбирает несколько столбцов и затем выполняет count
в качестве основного действия на Dataframe. До этого момента все работало нормально.
Затем он пытается сохранить данные в Elasticsearch, но не удается, потому что он не может обработать некоторый тип. Вы можете увидеть журнал ошибок здесь.
Я не уверен, почему он не может справиться с этим типом. Кто-нибудь знает, почему это происходит?
Я использую Apache Spark 1.5.0, Elasticsearch 1.4.4 и elaticsearch-hadoop 2.1.1
РЕДАКТИРОВАТЬ:
- Я обновил ссылку на gist с примером набора данных вместе с исходным кодом.
- Я также пытался использовать сборки dev, как указано @costin в списке рассылки.
1 ответ
Ответ на этот вопрос был хитрым, но благодаря samklr мне удалось выяснить, в чем проблема.
Тем не менее, решение не является простым и может учитывать некоторые "ненужные" преобразования.
Сначала поговорим о сериализации.
В Spark необходимо рассмотреть два аспекта сериализации и сериализации функций. В данном случае речь идет о сериализации данных и, следовательно, десериализации.
С точки зрения Spark, единственное, что требуется, это настроить сериализацию - Spark по умолчанию полагается на сериализацию Java, которая удобна, но довольно неэффективна. Именно поэтому Hadoop сам представил свой собственный механизм сериализации и свои собственные типы, а именно: Writables
, В качестве таких, InputFormat
а также OutputFormats
обязаны вернуться Writables
который, из коробки, Спарк не понимает.
С помощью разъема эластичного поиска-искры необходимо включить другую сериализацию (Kryo), которая автоматически выполняет преобразование, а также делает это довольно эффективно.
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
Несмотря на то, что Kryo не требует, чтобы класс реализовывал определенный интерфейс для сериализации, это означает, что POJO могут использоваться в RDD без какой-либо дополнительной работы, кроме включения сериализации Kryo.
Тем не менее, @samklr указал мне, что Kryo необходимо зарегистрировать классы перед их использованием.
Это потому, что Kryo записывает ссылку на класс сериализуемого объекта (одна запись записывается для каждого записанного объекта), который является просто целочисленным идентификатором, если класс был зарегистрирован, но в противном случае является полным именем класса. Spark регистрирует классы Scala и многие другие базовые классы (например, классы Avro Generic или Thrift) от вашего имени.
Регистрация классов с Kryo проста. Создайте подкласс KryoRegistrator и переопределите registerClasses()
метод:
public class MyKryoRegistrator implements KryoRegistrator, Serializable {
@Override
public void registerClasses(Kryo kryo) {
// Product POJO associated to a product Row from the DataFrame
kryo.register(Product.class);
}
}
Наконец, в вашей программе драйвера установите для свойства spark.kryo.registrator полное имя класса вашей реализации KryoRegistrator:
conf.set("spark.kryo.registrator", "MyKryoRegistrator")
Во-вторых, даже думал, что сериализатор Kryo установлен и класс зарегистрирован, с изменениями, внесенными в Spark 1.5, и по какой-то причине Elasticsearch не смог десериализовать Dataframe, потому что он не может вывести SchemaType
Датафрейма в разъем.
Поэтому мне пришлось преобразовать Dataframe в JavaRDD
JavaRDD<Product> products = df.javaRDD().map(new Function<Row, Product>() {
public Product call(Row row) throws Exception {
long id = row.getLong(0);
String title = row.getString(1);
String description = row.getString(2);
int merchantId = row.getInt(3);
double price = row.getDecimal(4).doubleValue();
String keywords = row.getString(5);
long brandId = row.getLong(6);
int categoryId = row.getInt(7);
return new Product(id, title, description, merchantId, price, keywords, brandId, categoryId);
}
});
Теперь данные готовы для записи в эластичный поиск:
JavaEsSpark.saveToEs(products, "test/test");
Рекомендации:
- Документация по поддержке Apache Spark от Elasticsearch.
- Руководство по Hadoop, глава 19. Spark, изд. 4 - Том Уайт.
- Пользователь samklr.