spring云数据流-http | kafka和kafka | hdfs-在hdfs中获取原始消息

ykejflvf  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(392)

我正在scdf(localserver1.7.3)中创建一个基本流,其中配置了2个流。1http->kafka主题2。Kafka主题->hdfs
流:

stream create --name ingest_from_http --definition "http --port=8000 --path-pattern=/test > :streamtest1"
stream deploy --name ingest_from_http --properties "app.http.spring.cloud.stream.bindings.output.producer.headerMode=raw"

stream create --name ingest_to_hdfs --definition ":streamtest1 > hdfs --fs-uri=hdfs://<host>:8020 --directory=/tmp/hive/sensedev/streamdemo/ --file-extension=xml --spring.cloud.stream.bindings.input.consumer.headerMode=raw"

我已经在location/tmp/hive/sensedev/streamdemo上创建了一个配置单元管理的表/

DROP TABLE IF EXISTS gwdemo.xml_test;
CREATE TABLE gwdemo.xml_test(

id int,

name string

 )

ROW FORMAT SERDE 'com.ibm.spss.hive.serde2.xml.XmlSerDe'

WITH SERDEPROPERTIES (

"column.xpath.id"="/body/id/text()",

"column.xpath.name"="/body/name/text()"

)

STORED AS

INPUTFORMAT 'com.ibm.spss.hive.serde2.xml.XmlInputFormat'

OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'

LOCATION '/tmp/hive/sensedev/streamdemo'

TBLPROPERTIES (

"xmlinput.start"="<body>",

"xmlinput.end"="</body>")

;

测试:
hive是否能够读取xml:将xml文件放在/tmp/hive/sensedev/streamdemo位置。
文件内容: <body><id>1</id><name>Test1</name></body> 在表上运行select命令时,它正确地显示了上面的记录。
当用httppost在scdf中发布记录时,我在kafka consumer中获得了正确的数据,但是当我检查hdfs时,正在创建xml文件,但是我在这些文件中接收原始消息。例子:
dataflow>httppost--target http:///test--data“ <body><id>2</id><name>Test2</name></body> “--contenttype应用程序/xml
在kafka console consumer中,我能够读取正确的xml消息: <body><id>2</id><name>Test2</name></body> ```
$ hdfs dfs -cat /tmp/hive/sensedev/streamdemo/hdfs-sink-2.xml
[B@31d94539

问题:1。我错过了什么?如何在hdfs中新创建的xml文件中获得正确的xml记录?
um6iljoc

um6iljoc1#

hdfs接收器需要一个java序列化对象。

相关问题