Spring 云数据流 - http |卡夫卡和卡夫卡 | hdfs - 在 HDFS 中获取原始消息
Spring Cloud Dataflow - http | kafka and kafka | hdfs - Getting Raw message in HDFS
我正在 SCDF(本地服务器 1.7.3)中创建一个基本流,其中我正在配置 2 个流。
1. HTTP -> Kafka主题
2. Kafka 主题 -> HDFS
流:
stream create --name ingest_from_http --definition "http --port=8000 --path-pattern=/test > :streamtest1"
stream deploy --name ingest_from_http --properties "app.http.spring.cloud.stream.bindings.output.producer.headerMode=raw"
stream create --name ingest_to_hdfs --definition ":streamtest1 > hdfs --fs-uri=hdfs://<host>:8020 --directory=/tmp/hive/sensedev/streamdemo/ --file-extension=xml --spring.cloud.stream.bindings.input.consumer.headerMode=raw"
我在 /tmp/hive/sensedev/streamdemo/
位置创建了一个管理 table 的 Hive
DROP TABLE IF EXISTS gwdemo.xml_test;
CREATE TABLE gwdemo.xml_test(
id int,
name string
)
ROW FORMAT SERDE 'com.ibm.spss.hive.serde2.xml.XmlSerDe'
WITH SERDEPROPERTIES (
"column.xpath.id"="/body/id/text()",
"column.xpath.name"="/body/name/text()"
)
STORED AS
INPUTFORMAT 'com.ibm.spss.hive.serde2.xml.XmlInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION '/tmp/hive/sensedev/streamdemo'
TBLPROPERTIES (
"xmlinput.start"="<body>",
"xmlinput.end"="</body>")
;
测试:
- Hive 是否能够读取 XML : 在该位置放置一个 xml 文件
/tmp/hive/sensedev/streamdemo.
文件内容:<body><id>1</id><name>Test1</name></body>
在 运行 SELECT 命令上 table,它正确显示了上述记录。
当 post 使用 http post 在 SCDF 中记录时,我得到了正确的数据
在 Kafka Consumer 中,但是当我检查 HDFS 时,xml 文件是
正在创建,但我在这些文件中收到原始消息。
示例:
数据流>http post --目标http:///test
--data "<body><id>2</id><name>Test2</name></body>
" --contentType application/xml
在 Kafka Console Consumer 中,我能够读取正确的 XML 消息:<body><id>2</id><name>Test2</name></body>
$ hdfs dfs -cat /tmp/hive/sensedev/streamdemo/hdfs-sink-2.xml
[B@31d94539
问题:
1.我错过了什么?如何在 HDFS 中新创建的 XML 文件中获取正确的 XML 记录?
HDFS 接收器需要一个 Java 序列化对象。
我正在 SCDF(本地服务器 1.7.3)中创建一个基本流,其中我正在配置 2 个流。 1. HTTP -> Kafka主题 2. Kafka 主题 -> HDFS
流:
stream create --name ingest_from_http --definition "http --port=8000 --path-pattern=/test > :streamtest1"
stream deploy --name ingest_from_http --properties "app.http.spring.cloud.stream.bindings.output.producer.headerMode=raw"
stream create --name ingest_to_hdfs --definition ":streamtest1 > hdfs --fs-uri=hdfs://<host>:8020 --directory=/tmp/hive/sensedev/streamdemo/ --file-extension=xml --spring.cloud.stream.bindings.input.consumer.headerMode=raw"
我在 /tmp/hive/sensedev/streamdemo/
位置创建了一个管理 table 的 HiveDROP TABLE IF EXISTS gwdemo.xml_test;
CREATE TABLE gwdemo.xml_test(
id int,
name string
)
ROW FORMAT SERDE 'com.ibm.spss.hive.serde2.xml.XmlSerDe'
WITH SERDEPROPERTIES (
"column.xpath.id"="/body/id/text()",
"column.xpath.name"="/body/name/text()"
)
STORED AS
INPUTFORMAT 'com.ibm.spss.hive.serde2.xml.XmlInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION '/tmp/hive/sensedev/streamdemo'
TBLPROPERTIES (
"xmlinput.start"="<body>",
"xmlinput.end"="</body>")
;
测试:
- Hive 是否能够读取 XML : 在该位置放置一个 xml 文件 /tmp/hive/sensedev/streamdemo.
文件内容:<body><id>1</id><name>Test1</name></body>
在 运行 SELECT 命令上 table,它正确显示了上述记录。
当 post 使用 http post 在 SCDF 中记录时,我得到了正确的数据 在 Kafka Consumer 中,但是当我检查 HDFS 时,xml 文件是 正在创建,但我在这些文件中收到原始消息。 示例:
数据流>http post --目标http:///test --data "
<body><id>2</id><name>Test2</name></body>
" --contentType application/xml
在 Kafka Console Consumer 中,我能够读取正确的 XML 消息:<body><id>2</id><name>Test2</name></body>
$ hdfs dfs -cat /tmp/hive/sensedev/streamdemo/hdfs-sink-2.xml [B@31d94539
问题: 1.我错过了什么?如何在 HDFS 中新创建的 XML 文件中获取正确的 XML 记录?
HDFS 接收器需要一个 Java 序列化对象。