卡在 clojure 中的通用类型提示 class
Stuck with type hints in clojure for generic class
我正在尝试从 Apache flink 运行 在 clojure 中获取一个小示例,但现在我被卡住了,因为 clojure 中的类型提示和 flink 中的一些奇怪的怪癖。
这是我的代码:
(ns pipeline.core
(:import
(org.apache.flink.api.java ExecutionEnvironment)
(org.apache.flink.api.common.functions FlatMapFunction)
(org.apache.flink.api.java.tuple Tuple2)
(org.apache.flink.util Collector)
(java.lang String)))
(def flink-env (ExecutionEnvironment/createLocalEnvironment))
(def dataset (.fromElements flink-env (to-array ["please test me"])))
(defn tokenizer [] (reify FlatMapFunction
( flatMap [this value collector]
(println value))))
(.flatMap dataset (tokenizer))
如果我不提供类型提示,我会从 flink 中收到错误 api:
Caused by: java.lang.IllegalArgumentException: The types of the interface org.apache.flink.api.common.functions.FlatMapFunction could not be inferred. Support for synthetic interfaces, lambdas, and generic types is limited at this point.
at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:662)
如果我提供类型提示:
(defn tokenizer [] (reify FlatMapFunction
( ^void flatMap [this ^String value ^Collector collector]
(println value))))
我从 clojure 编译器中得到一个错误:
Caused by: java.lang.IllegalArgumentException: Can't find matching method: flatMap, leave off hints for auto match.
at clojure.lang.Compiler$NewInstanceMethod.parse(Compiler.java:8065)
有没有办法在 clojure 中使用泛型 类 添加类型提示?
它应该是这样的:
(defn tokenizer [] (reify FlatMapFunction
( ^void flatMap [this ^String value ^Collector<Tuple2<String, Integer>> collector]
(println value))))
但这不起作用。有什么想法吗?
lein 配置如下所示:
(defproject pipeline "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.7.0"]
[org.apache.flink/flink-java "0.9.0"]
]
:aot :all)
Clojure 无法处理反射,因此您需要通过 Flink 方法手动指定 return 类型 returns
。
(.returns (.flatMap dataset (tokenizer)) String)
另外,Flink无法处理匿名classes:
,因此使用时需要使用deftype
定义tokenizer
并实例化一个新对象
(deftype tokenizer [] FlatMapFunction
(flatMap [this value collector]
(println value)))
(.flatMap dataset (tokenizer.))
这是一个完整的"Word-Count-Example",可以打包到jar中执行。
注意类型提示和转换。对于 tokenizer
输出 (int 1)
是必需的,否则 Long
将是 Tuple2
的第二种类型。此外,我们使用 String 来声明 tokenizer
的输出类型(class 类型是不够的,因为还必须指定反射类型)。最后,我们需要键入 hint (int-array [0])
来解析 groupBy
的重载(没有它,该方法对 Clojure 编译器来说是不明确的)。
(ns org.apache.flink.flink-clojure.WordCount
(:import
(org.apache.flink.api.common.functions FlatMapFunction)
(org.apache.flink.api.java DataSet)
(org.apache.flink.api.java ExecutionEnvironment)
(org.apache.flink.api.java.tuple Tuple2)
(org.apache.flink.util Collector)
(java.lang String))
(:require [clojure.string :as str])
(:gen-class))
(def flink-env (ExecutionEnvironment/createLocalEnvironment))
(def text (.fromElements flink-env (to-array ["please test me and me too"])))
(deftype tokenizer [] FlatMapFunction
(flatMap [this value collector]
(doseq [v (str/split value #"\s")]
(.collect collector (Tuple2. v (int 1))))))
(def tokens (.returns (.flatMap text (tokenizer.)) "Tuple2<String,Integer>"))
(def counts (.sum (.groupBy tokens (int-array [0])) 1))
(defn -main []
(.print counts)
)
作为此评论的跟进
使用最新的 flink 版本(在 1.6.1 上测试),你需要定义一个自定义 class 否则你会得到这样的错误:
Exception in thread "main" java.lang.IllegalArgumentException: No matching method found: returns for class org.apache.flink.api.java.operators.FlatMapOperator, compiling:(WordCount.clj:69:13)
自定义 class:
package org.apache.flink.java;
import org.apache.flink.api.java.tuple.Tuple2;
public class WordCountTuple extends Tuple2<String, Integer> {
}
clojure 代码
(ns org.apache.flink.clojure.WordCount
(:import
(org.apache.flink.api.common.functions FlatMapFunction)
(org.apache.flink.api.java DataSet)
(org.apache.flink.api.java ExecutionEnvironment)
(org.apache.flink.api.java.tuple Tuple2)
(org.apache.flink.java WordCountTuple)
(org.apache.flink.util Collector)
(java.lang String))
(:require [clojure.string :as str])
(:gen-class))
(def flink-env (ExecutionEnvironment/getExecutionEnvironment))
(def text (.fromElements flink-env (to-array ["please test me and me too"])))
(deftype tokenizer [] FlatMapFunction
(flatMap [this value collector]
(doseq [v (str/split value #"\s")]
(.collect collector (Tuple2. v (int 1))))))
(def tokens (.returns (.flatMap text (tokenizer.)) WordCountTuple))
(def counts (.sum (.groupBy tokens (int-array [0])) 1))
(defn -main []
(.print counts))
我正在尝试从 Apache flink 运行 在 clojure 中获取一个小示例,但现在我被卡住了,因为 clojure 中的类型提示和 flink 中的一些奇怪的怪癖。
这是我的代码:
(ns pipeline.core
(:import
(org.apache.flink.api.java ExecutionEnvironment)
(org.apache.flink.api.common.functions FlatMapFunction)
(org.apache.flink.api.java.tuple Tuple2)
(org.apache.flink.util Collector)
(java.lang String)))
(def flink-env (ExecutionEnvironment/createLocalEnvironment))
(def dataset (.fromElements flink-env (to-array ["please test me"])))
(defn tokenizer [] (reify FlatMapFunction
( flatMap [this value collector]
(println value))))
(.flatMap dataset (tokenizer))
如果我不提供类型提示,我会从 flink 中收到错误 api:
Caused by: java.lang.IllegalArgumentException: The types of the interface org.apache.flink.api.common.functions.FlatMapFunction could not be inferred. Support for synthetic interfaces, lambdas, and generic types is limited at this point.
at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:662)
如果我提供类型提示:
(defn tokenizer [] (reify FlatMapFunction
( ^void flatMap [this ^String value ^Collector collector]
(println value))))
我从 clojure 编译器中得到一个错误:
Caused by: java.lang.IllegalArgumentException: Can't find matching method: flatMap, leave off hints for auto match.
at clojure.lang.Compiler$NewInstanceMethod.parse(Compiler.java:8065)
有没有办法在 clojure 中使用泛型 类 添加类型提示? 它应该是这样的:
(defn tokenizer [] (reify FlatMapFunction
( ^void flatMap [this ^String value ^Collector<Tuple2<String, Integer>> collector]
(println value))))
但这不起作用。有什么想法吗?
lein 配置如下所示:
(defproject pipeline "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.7.0"]
[org.apache.flink/flink-java "0.9.0"]
]
:aot :all)
Clojure 无法处理反射,因此您需要通过 Flink 方法手动指定 return 类型 returns
。
(.returns (.flatMap dataset (tokenizer)) String)
另外,Flink无法处理匿名classes:
,因此使用时需要使用deftype
定义tokenizer
并实例化一个新对象
(deftype tokenizer [] FlatMapFunction
(flatMap [this value collector]
(println value)))
(.flatMap dataset (tokenizer.))
这是一个完整的"Word-Count-Example",可以打包到jar中执行。
注意类型提示和转换。对于 tokenizer
输出 (int 1)
是必需的,否则 Long
将是 Tuple2
的第二种类型。此外,我们使用 String 来声明 tokenizer
的输出类型(class 类型是不够的,因为还必须指定反射类型)。最后,我们需要键入 hint (int-array [0])
来解析 groupBy
的重载(没有它,该方法对 Clojure 编译器来说是不明确的)。
(ns org.apache.flink.flink-clojure.WordCount
(:import
(org.apache.flink.api.common.functions FlatMapFunction)
(org.apache.flink.api.java DataSet)
(org.apache.flink.api.java ExecutionEnvironment)
(org.apache.flink.api.java.tuple Tuple2)
(org.apache.flink.util Collector)
(java.lang String))
(:require [clojure.string :as str])
(:gen-class))
(def flink-env (ExecutionEnvironment/createLocalEnvironment))
(def text (.fromElements flink-env (to-array ["please test me and me too"])))
(deftype tokenizer [] FlatMapFunction
(flatMap [this value collector]
(doseq [v (str/split value #"\s")]
(.collect collector (Tuple2. v (int 1))))))
(def tokens (.returns (.flatMap text (tokenizer.)) "Tuple2<String,Integer>"))
(def counts (.sum (.groupBy tokens (int-array [0])) 1))
(defn -main []
(.print counts)
)
作为此评论的跟进
使用最新的 flink 版本(在 1.6.1 上测试),你需要定义一个自定义 class 否则你会得到这样的错误:
Exception in thread "main" java.lang.IllegalArgumentException: No matching method found: returns for class org.apache.flink.api.java.operators.FlatMapOperator, compiling:(WordCount.clj:69:13)
自定义 class:
package org.apache.flink.java;
import org.apache.flink.api.java.tuple.Tuple2;
public class WordCountTuple extends Tuple2<String, Integer> {
}
clojure 代码
(ns org.apache.flink.clojure.WordCount
(:import
(org.apache.flink.api.common.functions FlatMapFunction)
(org.apache.flink.api.java DataSet)
(org.apache.flink.api.java ExecutionEnvironment)
(org.apache.flink.api.java.tuple Tuple2)
(org.apache.flink.java WordCountTuple)
(org.apache.flink.util Collector)
(java.lang String))
(:require [clojure.string :as str])
(:gen-class))
(def flink-env (ExecutionEnvironment/getExecutionEnvironment))
(def text (.fromElements flink-env (to-array ["please test me and me too"])))
(deftype tokenizer [] FlatMapFunction
(flatMap [this value collector]
(doseq [v (str/split value #"\s")]
(.collect collector (Tuple2. v (int 1))))))
(def tokens (.returns (.flatMap text (tokenizer.)) WordCountTuple))
(def counts (.sum (.groupBy tokens (int-array [0])) 1))
(defn -main []
(.print counts))