在 Clojure 中使用 Amazonica 时,我应该如何格式化 Kinesis 事件的数据?

How should I format data for a Kinesis event when using Amazonica in Clojure?

当我使用 AWS CLI 将事件放入流中时,我可以传入 JSON 并在从 base64 解码后将其取回。当我尝试使用来自 Clojure 的 Amazonica 放置事件时,我很难正确格式化事件数据参数。

(kinesis/put-record "ad-stream" {:ad-id "some-id"} "parition-key"))

创建一个具有 base64 编码数据字段 "TlBZCAAAABXwBhsAAAACagVhZC1pZGkHc29tZS1pZA==" 的事件,该事件解码为

NP�jad-idisome-id

如果我JSON先对数据进行编码:

 (kinesis/put-record "ad-stream" (json/write-str {:ad-id "some-id-2"}) "parition-key")

然后我得到一个垃圾字符较少的事件,但它仍然不是很完美,不足以在不破坏某些东西的情况下在其他应用程序中阅读:

NPi{"ad-id":"some-id-2"}

在将 Clojure 映射转换为 JSON 时,那个领先的垃圾有什么意义?如何将一个简单的对象传递给 kinesis?

tests 显示了作为 put-record 的数据参数传递的普通地图,我还不明白为什么这对我不起作用:

  (let [data {:name "any data"
              :col  #{"anything" "at" "all"}
              :date now}
        sn (:sequence-number (put-record my-stream data (str (UUID/randomUUID))))]
    (put-record my-stream data (str (UUID/randomUUID)) sn))

  (Thread/sleep 3000)

  (def shard (-> (describe-stream my-stream)
               :stream-description
               :shards
               last
:shard-id))

更新

我很确定这是库(或它使用的序列化程序)中的错误,因此我将在 https://github.com/mcohen01/amazonica/issues/211.

的错误报告中继续调查

传递 JSON 字符串的 ByteBuffer 作为记录数据对我有用。

(kinesis/put-record "ad-stream"
                    (-> {:ad-id "ad-stream"}
                        json/write-str .getBytes ByteBuffer/wrap)
                    "parition-key")

记录数据:"eyJhZC1pZCI6ImFkLXN0cmVhbSJ9",解码为:

{"ad-id":"ad-stream"}

这解决了库中的任何编码问题,因为 Amazonica 在传递 ByteBuffer 时会跳过编码。