Застрял с подсказками типа в clojure для общего класса

Я пытаюсь получить небольшой пример от Apache Flink, работающего в clojure, но сейчас я застрял, из-за типа, намекающего на clojure, и какой-то странной причуды в Flink.

Вот мой код:

(ns pipeline.core
 (:import
 (org.apache.flink.api.java ExecutionEnvironment)
 (org.apache.flink.api.common.functions FlatMapFunction)
 (org.apache.flink.api.java.tuple Tuple2)
 (org.apache.flink.util Collector)
 (java.lang String)))

(def flink-env (ExecutionEnvironment/createLocalEnvironment))

(def dataset (.fromElements flink-env (to-array ["please test me"])))

(defn tokenizer [] (reify FlatMapFunction
                 ( flatMap [this value collector] 
                   (println value))))

(.flatMap dataset (tokenizer))

Если я не предоставляю подсказки типа, я получаю сообщение об ошибке из интерфейса flink:

Caused by: java.lang.IllegalArgumentException: The types of the interface org.apache.flink.api.common.functions.FlatMapFunction could not be inferred. Support for synthetic interfaces, lambdas, and generic types is limited at this point.
at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:662)

Если я предоставлю подсказки типа:

(defn tokenizer [] (reify FlatMapFunction
                 ( ^void flatMap [this ^String value ^Collector collector] 
                   (println value))))

Я получаю сообщение об ошибке от компилятора clojure:

Caused by: java.lang.IllegalArgumentException: Can't find matching method: flatMap, leave off hints for auto match.
at clojure.lang.Compiler$NewInstanceMethod.parse(Compiler.java:8065) 

Есть ли способ добавить подсказки типов в clojure с универсальными классами? Это должно быть что-то вроде этого:

(defn tokenizer [] (reify FlatMapFunction
                 ( ^void flatMap [this ^String value ^Collector<Tuple2<String, Integer>> collector] 
                   (println value))))

Но это не работает. Есть идеи?

Конфигурация lein выглядит следующим образом:

(defproject pipeline "0.1.0-SNAPSHOT"
 :description "FIXME: write description"
 :url "http://example.com/FIXME"
 :license {:name "Eclipse Public License"
        :url "http://www.eclipse.org/legal/epl-v10.html"}
 :dependencies [[org.clojure/clojure "1.7.0"]               
             [org.apache.flink/flink-java "0.9.0"]              
             ]
  :aot :all)

2 ответа

Решение

Clojure не может обрабатывать отражения, поэтому вам нужно указать тип возврата вручную с помощью метода Flink returns,

(.returns (.flatMap dataset (tokenizer)) String)

Кроме того, вам нужно использовать deftype определить tokenizer и создать новый объект при его использовании, потому что Flink не может обрабатывать анонимные классы:

(deftype tokenizer [] FlatMapFunction
                      (flatMap [this value collector] 
                        (println value)))

(.flatMap dataset (tokenizer.))

Вот полный "Пример подсчета слов", который можно упаковать в банку и выполнить.

Обратите внимание на тип подсказок и приведений. За tokenizer выход (int 1) требуется, в противном случае Long будет вторым типом Tuple2, Кроме того, мы используем строку для объявления типа вывода для tokenizer (тип класса не является достаточным, потому что типы отражения также должны быть указаны). Наконец, нам нужно ввести подсказку (int-array [0]) чтобы решить перегрузку groupBy (без него метод неоднозначен с компилятором Clojure).

(ns org.apache.flink.flink-clojure.WordCount
 (:import
 (org.apache.flink.api.common.functions FlatMapFunction)
 (org.apache.flink.api.java DataSet)
 (org.apache.flink.api.java ExecutionEnvironment)
 (org.apache.flink.api.java.tuple Tuple2)
 (org.apache.flink.util Collector)
 (java.lang String))
 (:require [clojure.string :as str])
 (:gen-class))

(def flink-env (ExecutionEnvironment/createLocalEnvironment))

(def text (.fromElements flink-env (to-array ["please test me and me too"])))

(deftype tokenizer [] FlatMapFunction
                      (flatMap [this value collector]
                        (doseq [v (str/split value #"\s")]
                          (.collect collector (Tuple2. v (int 1))))))

(def tokens (.returns (.flatMap text (tokenizer.)) "Tuple2<String,Integer>"))

(def counts (.sum (.groupBy tokens (int-array [0])) 1))

(defn -main []
  (.print counts)
)

Как продолжение этого комментария Застрял с подсказками типа в clojure для универсального класса

В последних версиях flink (протестировано на 1.6.1) вам нужно определить собственный класс, иначе вы получите ошибку вроде:

Exception in thread "main" java.lang.IllegalArgumentException: No matching method found: returns for class org.apache.flink.api.java.operators.FlatMapOperator, compiling:(WordCount.clj:69:13)

пользовательский класс:

package org.apache.flink.java;

import org.apache.flink.api.java.tuple.Tuple2;


public class WordCountTuple extends Tuple2<String, Integer> {

}

закрытый код

(ns org.apache.flink.clojure.WordCount
  (:import
   (org.apache.flink.api.common.functions FlatMapFunction)
   (org.apache.flink.api.java DataSet)
   (org.apache.flink.api.java ExecutionEnvironment)
   (org.apache.flink.api.java.tuple Tuple2)
   (org.apache.flink.java WordCountTuple)
   (org.apache.flink.util Collector)
   (java.lang String))
  (:require [clojure.string :as str])
  (:gen-class))

(def flink-env (ExecutionEnvironment/getExecutionEnvironment))

(def text (.fromElements flink-env (to-array ["please test me and me too"])))

(deftype tokenizer [] FlatMapFunction
         (flatMap [this value collector]
           (doseq [v (str/split value #"\s")]
             (.collect collector (Tuple2. v (int 1))))))

(def tokens (.returns (.flatMap text (tokenizer.)) WordCountTuple))

(def counts (.sum (.groupBy tokens (int-array [0])) 1))

(defn -main []
  (.print counts))

рабочий пример ветки здесь https://github.com/guillaume/flink-external

Другие вопросы по тегам