从流文件内容中提取多行内容

Extracting multiline content from flow file content

我正在从 MySQL table 导入数据(仅针对选定的列)并将其放入 HDFS。完成后,我想在 Hive 中创建一个 table。

为此,我有一个 schema.sql 文件,其中包含整个 table 的 CREATE TABLE 语句,并且我只想生成新的 CREATE TABLE 语句我导入的列。

与我在下面的示例中使用 grep 所做的类似。

我将 FetchFileExtractText 一起使用,但无法正常工作。如果我将整个模式放入一个属性中,我如何使用 NiFi 处理器甚至表达式语言来实现这一点?

或者是否有更好的方法在导入的数据上创建 table?

NiFi可以根据流文件内容

生成Create table语句[s]

1.Creating ORC tables by using ConvertAvroToORC processor:

  • 如果您将 avro 数据转换为 ORC 格式然后存储到 HDFS,则 ConvertAvroToORC 处理器将 hive.ddl 属性添加到流文件。

  • PutHDFS 处理器将 absolute.hdfs.path 属性添加到流文件。

  • 我们可以使用这个hive.ddl,absolute.hdfs.path属性来创建orc table 动态地在 HDFS 目录之上。

流量:

 Pull data from source(ExecuteSQL...etc)
  -> ConvertAvroToORC //add Hive DbName,TableName in HiveTableName property value--> 
  -> PutHDFS //store the orc file into HDFS location --> 
  -> ReplaceText //Replace the flowfile content with ${hive.ddl} Location '${absolute.hdfs.path}'--> 
  -> PutHiveQL //execute the create table statement

请参阅 this link 以获取更多详细信息重新生成上述流程。

2.Creating Avro tables by using ExtractAvroMetaData processor:

  • 在 NiFi 中,一旦我们使用 QueryDatabaseTable 提取数据,ExecuteSQL 处理器,数据格式为 AVRO.

  • 我们可以创建 Avro tables 基于 avro 模式(.avsc 文件) 和通过使用 ExtractAvroMetaData 处理器,我们可以提取架构并保留为流文件属性,然后通过使用此架构,我们可以动态创建 AvroTables。

流量:

ExecuteSQL (success)|-> PutHDFS //store data into HDFS
           (success)|-> ExtractAvroMetadata //configure Metadata Keys as avro.schema 
                     -> ReplaceText //replace flowfile content with avro.schema
                     -> PutHDFS //store the avsc file into schema directory
                     -> ReplaceText //create avro table on top of schema directory
                     -> PutHiveQL //execute the hive.ddl

示例 AVRO 创建 table 语句:

CREATE TABLE as_avro
  ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
  STORED as INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
  OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
  TBLPROPERTIES (
    'avro.schema.url'='/path/to/the/schema/test_serializer.avsc');

我们将在上述流程中使用 ReplaceText 处理器更改 架构 url 的路径。

使用 ExecuteSQL 处理器获取所有 create table 语句(或)列信息的另一种方法从 (sys.tables/INFORMATION_SCHEMA.COLUMNS ..etc) 来自源(如果源系统允许)并将脚本写入 map the data typeshive appropriate types 然后将它们存储在您的 desired format 在 Hive 中。

编辑:

要运行 grep command on the flowfile content 我们需要使用ExecuteStreamCommand processor

电调配置:

然后将 output stream 关系提供给 ExtractText 处理器

ET 配置:

添加新的属性为

内容

(?s)(.*)

然后 content attribute 添加到流文件中,您可以使用该属性并准备创建 table 语句。