从流文件内容中提取多行内容
Extracting multiline content from flow file content
我正在从 MySQL table 导入数据(仅针对选定的列)并将其放入 HDFS。完成后,我想在 Hive 中创建一个 table。
为此,我有一个 schema.sql
文件,其中包含整个 table 的 CREATE TABLE 语句,并且我只想生成新的 CREATE TABLE 语句我导入的列。
与我在下面的示例中使用 grep
所做的类似。
我将 FetchFile
与 ExtractText
一起使用,但无法正常工作。如果我将整个模式放入一个属性中,我如何使用 NiFi 处理器甚至表达式语言来实现这一点?
或者是否有更好的方法在导入的数据上创建 table?
NiFi可以根据流文件内容
生成Create table语句[s]
1.Creating ORC tables by using ConvertAvroToORC processor:
如果您将 avro 数据转换为 ORC 格式然后存储到 HDFS,则 ConvertAvroToORC 处理器将 hive.ddl
属性添加到流文件。
PutHDFS 处理器将 absolute.hdfs.path
属性添加到流文件。
我们可以使用这个hive.ddl,absolute.hdfs.path属性来创建orc table 动态地在 HDFS 目录之上。
流量:
Pull data from source(ExecuteSQL...etc)
-> ConvertAvroToORC //add Hive DbName,TableName in HiveTableName property value-->
-> PutHDFS //store the orc file into HDFS location -->
-> ReplaceText //Replace the flowfile content with ${hive.ddl} Location '${absolute.hdfs.path}'-->
-> PutHiveQL //execute the create table statement
请参阅 this link 以获取更多详细信息重新生成上述流程。
2.Creating Avro tables by using ExtractAvroMetaData processor:
在 NiFi 中,一旦我们使用 QueryDatabaseTable 提取数据,ExecuteSQL 处理器,数据格式为 AVRO.
我们可以创建 Avro tables 基于 avro 模式(.avsc 文件) 和通过使用 ExtractAvroMetaData 处理器,我们可以提取架构并保留为流文件属性,然后通过使用此架构,我们可以动态创建 AvroTables。
流量:
ExecuteSQL (success)|-> PutHDFS //store data into HDFS
(success)|-> ExtractAvroMetadata //configure Metadata Keys as avro.schema
-> ReplaceText //replace flowfile content with avro.schema
-> PutHDFS //store the avsc file into schema directory
-> ReplaceText //create avro table on top of schema directory
-> PutHiveQL //execute the hive.ddl
示例 AVRO 创建 table 语句:
CREATE TABLE as_avro
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED as INPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
TBLPROPERTIES (
'avro.schema.url'='/path/to/the/schema/test_serializer.avsc');
我们将在上述流程中使用 ReplaceText 处理器更改 架构 url 的路径。
使用 ExecuteSQL 处理器获取所有 create table 语句(或)列信息的另一种方法从 (sys.tables/INFORMATION_SCHEMA.COLUMNS ..etc) 来自源(如果源系统允许)并将脚本写入 map the data types
到 hive appropriate types
然后将它们存储在您的 desired format
在 Hive 中。
编辑:
要运行 grep
command on the flowfile content 我们需要使用ExecuteStreamCommand processor
电调配置:
然后将 output stream
关系提供给 ExtractText 处理器
ET 配置:
添加新的属性为
内容
(?s)(.*)
然后 content attribute
添加到流文件中,您可以使用该属性并准备创建 table 语句。
我正在从 MySQL table 导入数据(仅针对选定的列)并将其放入 HDFS。完成后,我想在 Hive 中创建一个 table。
为此,我有一个 schema.sql
文件,其中包含整个 table 的 CREATE TABLE 语句,并且我只想生成新的 CREATE TABLE 语句我导入的列。
与我在下面的示例中使用 grep
所做的类似。
我将 FetchFile
与 ExtractText
一起使用,但无法正常工作。如果我将整个模式放入一个属性中,我如何使用 NiFi 处理器甚至表达式语言来实现这一点?
或者是否有更好的方法在导入的数据上创建 table?
NiFi可以根据流文件内容
生成Create table语句[s]1.Creating ORC tables by using ConvertAvroToORC processor:
如果您将 avro 数据转换为 ORC 格式然后存储到 HDFS,则 ConvertAvroToORC 处理器将
hive.ddl
属性添加到流文件。PutHDFS 处理器将
absolute.hdfs.path
属性添加到流文件。我们可以使用这个hive.ddl,absolute.hdfs.path属性来创建orc table 动态地在 HDFS 目录之上。
流量:
Pull data from source(ExecuteSQL...etc)
-> ConvertAvroToORC //add Hive DbName,TableName in HiveTableName property value-->
-> PutHDFS //store the orc file into HDFS location -->
-> ReplaceText //Replace the flowfile content with ${hive.ddl} Location '${absolute.hdfs.path}'-->
-> PutHiveQL //execute the create table statement
请参阅 this link 以获取更多详细信息重新生成上述流程。
2.Creating Avro tables by using ExtractAvroMetaData processor:
在 NiFi 中,一旦我们使用 QueryDatabaseTable 提取数据,ExecuteSQL 处理器,数据格式为 AVRO.
我们可以创建 Avro tables 基于 avro 模式(.avsc 文件) 和通过使用 ExtractAvroMetaData 处理器,我们可以提取架构并保留为流文件属性,然后通过使用此架构,我们可以动态创建 AvroTables。
流量:
ExecuteSQL (success)|-> PutHDFS //store data into HDFS
(success)|-> ExtractAvroMetadata //configure Metadata Keys as avro.schema
-> ReplaceText //replace flowfile content with avro.schema
-> PutHDFS //store the avsc file into schema directory
-> ReplaceText //create avro table on top of schema directory
-> PutHiveQL //execute the hive.ddl
示例 AVRO 创建 table 语句:
CREATE TABLE as_avro
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED as INPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
TBLPROPERTIES (
'avro.schema.url'='/path/to/the/schema/test_serializer.avsc');
我们将在上述流程中使用 ReplaceText 处理器更改 架构 url 的路径。
使用 ExecuteSQL 处理器获取所有 create table 语句(或)列信息的另一种方法从 (sys.tables/INFORMATION_SCHEMA.COLUMNS ..etc) 来自源(如果源系统允许)并将脚本写入 map the data types
到 hive appropriate types
然后将它们存储在您的 desired format
在 Hive 中。
编辑:
要运行 grep
command on the flowfile content 我们需要使用ExecuteStreamCommand processor
电调配置:
然后将 output stream
关系提供给 ExtractText 处理器
ET 配置:
添加新的属性为
内容
(?s)(.*)
然后 content attribute
添加到流文件中,您可以使用该属性并准备创建 table 语句。