Застрял с подсказками типа в clojure для общего класса
Я пытаюсь получить небольшой пример от Apache Flink, работающего в clojure, но сейчас я застрял, из-за типа, намекающего на clojure, и какой-то странной причуды в Flink.
Вот мой код:
(ns pipeline.core
(:import
(org.apache.flink.api.java ExecutionEnvironment)
(org.apache.flink.api.common.functions FlatMapFunction)
(org.apache.flink.api.java.tuple Tuple2)
(org.apache.flink.util Collector)
(java.lang String)))
(def flink-env (ExecutionEnvironment/createLocalEnvironment))
(def dataset (.fromElements flink-env (to-array ["please test me"])))
(defn tokenizer [] (reify FlatMapFunction
( flatMap [this value collector]
(println value))))
(.flatMap dataset (tokenizer))
Если я не предоставляю подсказки типа, я получаю сообщение об ошибке из интерфейса flink:
Caused by: java.lang.IllegalArgumentException: The types of the interface org.apache.flink.api.common.functions.FlatMapFunction could not be inferred. Support for synthetic interfaces, lambdas, and generic types is limited at this point.
at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:662)
Если я предоставлю подсказки типа:
(defn tokenizer [] (reify FlatMapFunction
( ^void flatMap [this ^String value ^Collector collector]
(println value))))
Я получаю сообщение об ошибке от компилятора clojure:
Caused by: java.lang.IllegalArgumentException: Can't find matching method: flatMap, leave off hints for auto match.
at clojure.lang.Compiler$NewInstanceMethod.parse(Compiler.java:8065)
Есть ли способ добавить подсказки типов в clojure с универсальными классами? Это должно быть что-то вроде этого:
(defn tokenizer [] (reify FlatMapFunction
( ^void flatMap [this ^String value ^Collector<Tuple2<String, Integer>> collector]
(println value))))
Но это не работает. Есть идеи?
Конфигурация lein выглядит следующим образом:
(defproject pipeline "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.7.0"]
[org.apache.flink/flink-java "0.9.0"]
]
:aot :all)
2 ответа
Clojure не может обрабатывать отражения, поэтому вам нужно указать тип возврата вручную с помощью метода Flink returns
,
(.returns (.flatMap dataset (tokenizer)) String)
Кроме того, вам нужно использовать deftype
определить tokenizer
и создать новый объект при его использовании, потому что Flink не может обрабатывать анонимные классы:
(deftype tokenizer [] FlatMapFunction
(flatMap [this value collector]
(println value)))
(.flatMap dataset (tokenizer.))
Вот полный "Пример подсчета слов", который можно упаковать в банку и выполнить.
Обратите внимание на тип подсказок и приведений. За tokenizer
выход (int 1)
требуется, в противном случае Long
будет вторым типом Tuple2
, Кроме того, мы используем строку для объявления типа вывода для tokenizer
(тип класса не является достаточным, потому что типы отражения также должны быть указаны). Наконец, нам нужно ввести подсказку (int-array [0])
чтобы решить перегрузку groupBy
(без него метод неоднозначен с компилятором Clojure).
(ns org.apache.flink.flink-clojure.WordCount
(:import
(org.apache.flink.api.common.functions FlatMapFunction)
(org.apache.flink.api.java DataSet)
(org.apache.flink.api.java ExecutionEnvironment)
(org.apache.flink.api.java.tuple Tuple2)
(org.apache.flink.util Collector)
(java.lang String))
(:require [clojure.string :as str])
(:gen-class))
(def flink-env (ExecutionEnvironment/createLocalEnvironment))
(def text (.fromElements flink-env (to-array ["please test me and me too"])))
(deftype tokenizer [] FlatMapFunction
(flatMap [this value collector]
(doseq [v (str/split value #"\s")]
(.collect collector (Tuple2. v (int 1))))))
(def tokens (.returns (.flatMap text (tokenizer.)) "Tuple2<String,Integer>"))
(def counts (.sum (.groupBy tokens (int-array [0])) 1))
(defn -main []
(.print counts)
)
Как продолжение этого комментария Застрял с подсказками типа в clojure для универсального класса
В последних версиях flink (протестировано на 1.6.1) вам нужно определить собственный класс, иначе вы получите ошибку вроде:
Exception in thread "main" java.lang.IllegalArgumentException: No matching method found: returns for class org.apache.flink.api.java.operators.FlatMapOperator, compiling:(WordCount.clj:69:13)
пользовательский класс:
package org.apache.flink.java;
import org.apache.flink.api.java.tuple.Tuple2;
public class WordCountTuple extends Tuple2<String, Integer> {
}
закрытый код
(ns org.apache.flink.clojure.WordCount
(:import
(org.apache.flink.api.common.functions FlatMapFunction)
(org.apache.flink.api.java DataSet)
(org.apache.flink.api.java ExecutionEnvironment)
(org.apache.flink.api.java.tuple Tuple2)
(org.apache.flink.java WordCountTuple)
(org.apache.flink.util Collector)
(java.lang String))
(:require [clojure.string :as str])
(:gen-class))
(def flink-env (ExecutionEnvironment/getExecutionEnvironment))
(def text (.fromElements flink-env (to-array ["please test me and me too"])))
(deftype tokenizer [] FlatMapFunction
(flatMap [this value collector]
(doseq [v (str/split value #"\s")]
(.collect collector (Tuple2. v (int 1))))))
(def tokens (.returns (.flatMap text (tokenizer.)) WordCountTuple))
(def counts (.sum (.groupBy tokens (int-array [0])) 1))
(defn -main []
(.print counts))
рабочий пример ветки здесь https://github.com/guillaume/flink-external