在 Spark 的 groupByKey 和 countByKey 中使用 JodaTime
Using JodaTime in Spark's groupByKey and countByKey
我有一个非常简单的 Spark 程序(在 Clojure 中使用 Flambo,但应该很容易理解)。这些都是 JVM 上的对象。我正在 local
实例上进行测试(尽管我猜想 Spark 仍然会序列化和反序列化)。
(let [dt (t/date-time 2014)
input (f/parallelize sc [{:the-date dt :x "A"}
{:the-date dt :x "B"}
{:the-date dt :x "C"}
{:the-date dt :x "D"}])
by-date (f/map input (f/fn [{the-date :the-date x :x}] [the-date x])))
输入是一个包含四个元组的 RDD,每个元组具有相同的日期对象。第一个映射生成日期 => x 的键值 RDD。
input
的内容如预期的那样:
=> (f/foreach input prn)
[#<DateTime 2014-01-01T00:00:00.000Z> "A"]
[#<DateTime 2014-01-01T00:00:00.000Z> "B"]
[#<DateTime 2014-01-01T00:00:00.000Z> "C"]
[#<DateTime 2014-01-01T00:00:00.000Z> "D"]
明确地说,相等和 .hashCode
作用于日期对象:
=> (= dt dt)
true
=> (.hashCode dt)
1260848926
=> (.hashCode dt)
1260848926
它们是 JodaTime DateTime, which implement equals as expected.
的实例
当我尝试 countByKey
时,我得到了预期的结果:
=> (f/count-by-key by-date)
{#<DateTime 2014-01-01T00:00:00.000Z> 4}
可是我groupByKey
的时候好像不行。
=> (f/foreach (f/group-by-key by-date) prn)
[#<DateTime 2014-01-01T00:00:00.000Z> ["A"]]
[#<DateTime 2014-01-01T00:00:00.000Z> ["B"]]
[#<DateTime 2014-01-01T00:00:00.000Z> ["C"]]
[#<DateTime 2014-01-01T00:00:00.000Z> ["D"]]
所有的键都是相同的,所以我希望结果是一个单一的条目,日期作为键,["A", "B", "C", "D"]
作为值。发生了一些事情,因为值都是列表。
不知何故 groupByKey
没有正确地等同键。但是 countByKey
是。两者有什么区别?我怎样才能使它们的行为相同?
有什么想法吗?
我离答案越来越近了。我认为这属于答案部分而不是问题部分。
这个按键分组,变成本地收集,提取第一项(日期)。
=> (def result-dates (map first (f/collect (f/group-by-key by-date))))
=> result-dates
(#<DateTime 2014-01-01T00:00:00.000Z>
#<DateTime 2014-01-01T00:00:00.000Z>
#<DateTime 2014-01-01T00:00:00.000Z>
#<DateTime 2014-01-01T00:00:00.000Z>)
哈希码都一样
=> (map #(.hashCode %) result-dates)
(1260848926
1260848926
1260848926
1260848926)
毫秒都一样:
=> (map #(.getMillis %) result-dates)
(1388534400000
1388534400000
1388534400000
1388534400000)
equals
失败,但 isEquals
成功
=> (.isEqual (first result-dates) (second result-dates))
true
=> (.equals (first result-dates) (second result-dates))
false
documentation for .equals
says:
Compares this object with the specified object for equality based on the millisecond instant and the Chronology
它们的毫秒数都相等,它们的年表似乎是:
=> (map #(.getChronology %) result-dates)
(#<ISOChronology ISOChronology[UTC]>
#<ISOChronology ISOChronology[UTC]>
#<ISOChronology ISOChronology[UTC]>
#<ISOChronology ISOChronology[UTC]>)
但是,年表 不 等同。
=> (def a (first result-dates))
=> (def b (second result-dates))
=> (= (.getChronology a) (.getChronology b))
false
尽管哈希码可以
=> (= (.hashCode (.getChronology a)) (.hashCode (.getChronology b)))
true
但是joda.time.Chronology doesn't provide its own equals method继承自Object,只使用引用相等。
我的理论是,这些日期都是用它们自己的、不同的、构造的 Chronology 对象进行反序列化的,但是 JodaTime 有 its own serializer which probably deals with this. Maybe a custom Kryo 序列化器在这方面会有所帮助。
目前,我在 Spark 中使用 JodaTime 的解决方案是使用 org.joda.time .Instant by calling toInstant
, or a java.util.Date
rather than a org.joda.time.DateTime。
两者都涉及丢弃时区信息,这并不理想,所以如果有人有更多信息,我们将非常欢迎!
我有一个非常简单的 Spark 程序(在 Clojure 中使用 Flambo,但应该很容易理解)。这些都是 JVM 上的对象。我正在 local
实例上进行测试(尽管我猜想 Spark 仍然会序列化和反序列化)。
(let [dt (t/date-time 2014)
input (f/parallelize sc [{:the-date dt :x "A"}
{:the-date dt :x "B"}
{:the-date dt :x "C"}
{:the-date dt :x "D"}])
by-date (f/map input (f/fn [{the-date :the-date x :x}] [the-date x])))
输入是一个包含四个元组的 RDD,每个元组具有相同的日期对象。第一个映射生成日期 => x 的键值 RDD。
input
的内容如预期的那样:
=> (f/foreach input prn)
[#<DateTime 2014-01-01T00:00:00.000Z> "A"]
[#<DateTime 2014-01-01T00:00:00.000Z> "B"]
[#<DateTime 2014-01-01T00:00:00.000Z> "C"]
[#<DateTime 2014-01-01T00:00:00.000Z> "D"]
明确地说,相等和 .hashCode
作用于日期对象:
=> (= dt dt)
true
=> (.hashCode dt)
1260848926
=> (.hashCode dt)
1260848926
它们是 JodaTime DateTime, which implement equals as expected.
的实例当我尝试 countByKey
时,我得到了预期的结果:
=> (f/count-by-key by-date)
{#<DateTime 2014-01-01T00:00:00.000Z> 4}
可是我groupByKey
的时候好像不行。
=> (f/foreach (f/group-by-key by-date) prn)
[#<DateTime 2014-01-01T00:00:00.000Z> ["A"]]
[#<DateTime 2014-01-01T00:00:00.000Z> ["B"]]
[#<DateTime 2014-01-01T00:00:00.000Z> ["C"]]
[#<DateTime 2014-01-01T00:00:00.000Z> ["D"]]
所有的键都是相同的,所以我希望结果是一个单一的条目,日期作为键,["A", "B", "C", "D"]
作为值。发生了一些事情,因为值都是列表。
不知何故 groupByKey
没有正确地等同键。但是 countByKey
是。两者有什么区别?我怎样才能使它们的行为相同?
有什么想法吗?
我离答案越来越近了。我认为这属于答案部分而不是问题部分。
这个按键分组,变成本地收集,提取第一项(日期)。
=> (def result-dates (map first (f/collect (f/group-by-key by-date))))
=> result-dates
(#<DateTime 2014-01-01T00:00:00.000Z>
#<DateTime 2014-01-01T00:00:00.000Z>
#<DateTime 2014-01-01T00:00:00.000Z>
#<DateTime 2014-01-01T00:00:00.000Z>)
哈希码都一样
=> (map #(.hashCode %) result-dates)
(1260848926
1260848926
1260848926
1260848926)
毫秒都一样:
=> (map #(.getMillis %) result-dates)
(1388534400000
1388534400000
1388534400000
1388534400000)
equals
失败,但 isEquals
成功
=> (.isEqual (first result-dates) (second result-dates))
true
=> (.equals (first result-dates) (second result-dates))
false
documentation for .equals
says:
Compares this object with the specified object for equality based on the millisecond instant and the Chronology
它们的毫秒数都相等,它们的年表似乎是:
=> (map #(.getChronology %) result-dates)
(#<ISOChronology ISOChronology[UTC]>
#<ISOChronology ISOChronology[UTC]>
#<ISOChronology ISOChronology[UTC]>
#<ISOChronology ISOChronology[UTC]>)
但是,年表 不 等同。
=> (def a (first result-dates))
=> (def b (second result-dates))
=> (= (.getChronology a) (.getChronology b))
false
尽管哈希码可以
=> (= (.hashCode (.getChronology a)) (.hashCode (.getChronology b)))
true
但是joda.time.Chronology doesn't provide its own equals method继承自Object,只使用引用相等。
我的理论是,这些日期都是用它们自己的、不同的、构造的 Chronology 对象进行反序列化的,但是 JodaTime 有 its own serializer which probably deals with this. Maybe a custom Kryo 序列化器在这方面会有所帮助。
目前,我在 Spark 中使用 JodaTime 的解决方案是使用 org.joda.time .Instant by calling toInstant
, or a java.util.Date
rather than a org.joda.time.DateTime。
两者都涉及丢弃时区信息,这并不理想,所以如果有人有更多信息,我们将非常欢迎!