通过 Apache Flink 将 SQL Query 的结果写入文件

Write the result of SQL Query to file by Apache Flink

我有以下任务:

  1. 使用 SQL 请求创建作业到 Hive table;
  2. 运行 这个作业在远程 Flink 集群上;
  3. 将此作业的结果收集到文件中(最好使用 HDFS)。

备注

因为需要在远程 Flink 集群上 运行 这个作业,所以我不能简单地使用 TableEnvironment。这张票中提到了这个问题:https://issues.apache.org/jira/browse/FLINK-18095. For current solution I use adivce from http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Table-Environment-for-Remote-Execution-td35691.html.

代码

EnvironmentSettings batchSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
// create remote env
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, "/path/to/my/jar");
// create StreamTableEnvironment
TableConfig tableConfig = new TableConfig();
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
CatalogManager catalogManager = CatalogManager.newBuilder()
                                              .classLoader(classLoader)
                                              .config(tableConfig.getConfiguration())
                                              .defaultCatalog(
                                                  batchSettings.getBuiltInCatalogName(),
                                                  new GenericInMemoryCatalog(
                                                      batchSettings.getBuiltInCatalogName(),
                                                      batchSettings.getBuiltInDatabaseName()))
                                              .executionConfig(
                                                  streamExecutionEnvironment.getConfig())
                                              .build();
ModuleManager moduleManager = new ModuleManager();
BatchExecutor batchExecutor = new BatchExecutor(streamExecutionEnvironment);
FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);
StreamTableEnvironmentImpl tableEnv = new StreamTableEnvironmentImpl(
    catalogManager,
    moduleManager,
    functionCatalog,
    tableConfig,
    streamExecutionEnvironment,
    new BatchPlanner(batchExecutor, tableConfig, functionCatalog, catalogManager),
    batchExecutor,
    false);
// configure HiveCatalog
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/path/to/hive/conf"; // a local path
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);
tableEnv.useCatalog("myhive");
// request to Hive
Table table = tableEnv.sqlQuery("select * from myhive.`default`.test");

问题

在这一步我可以调用 table.execute() 方法,然后通过 得到 CloseableIterator收集() 方法。但就我而言,由于我的请求,我可以获得大量行,将其收集到文件(HDFS 中的 ORC)中将是完美的。

我怎样才能达到我的目标?

Table.execute().collect() returns 将视图的结果发送给您的客户端以进行交互。在您的情况下,您可以使用文件系统连接器并使用 INSERT INTO 将视图写入文件。例如:

// create a filesystem table
tableEnvironment.executeSql("CREATE TABLE MyUserTable (\n" +
    "  column_name1 INT,\n" +
    "  column_name2 STRING,\n" +
    "  ..." +
    " \n" +
    ") WITH (\n" +
    "  'connector' = 'filesystem',\n" +
    "  'path' = 'hdfs://path/to/your/file',\n" +
    "  'format' = 'orc' \n" +
    ")");

// submit the job
tableEnvironment.executeSql("insert into MyUserTable select * from myhive.`default`.test");

查看有关文件系统连接器的更多信息:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html