将数据从 SQL 服务器加载到 S3 作为 parquet - AWS EMR

Loading data from SQL Server to S3 as parquet - AWS EMR

目前我们的数据在 SQL 服务器中,我们正在尝试将它们作为镶木地板文件移动到我们的 s3 存储桶中。目的是在 AWS EMR(主要是 Spark、Hive 和 Presto)中分析这个 s3 数据。我们不想将数据存储在 HDFS 中。

  1. 这里有哪些选择?据我们所知,似乎我们可以使用 spark 或 sqoop 进行此导入。尽管在这种情况下 sqoop 由于并行性(并行数据库连接)比 Spark 快,但似乎无法将 parquet 文件从 sqoop 写入 s3 - Sqoop + S3 + Parquet results in Wrong FS error 。解决方法是移动到 hdfs,然后移动到 s3。但是,这似乎效率不高。如何使用 SparkSQL 从 SQL 服务器中提取此数据并在 s3 中写入镶木地板?

  2. 一旦我们将此数据加载为这种格式的镶木地板

    s3://mybucket/table_a/day_1/(parquet files 1 ... n).
    s3://mybucket/table_a/day_2/(parquet files 1 ... n).
    s3://mybucket/table_a/day_3/(parquet files 1 ... n).
    

如何将它们合并为一个 table 并使用 Hive 进行查询。我知道我们可以创建指向 s3 的外部配置单元 table,但是我们可以指向多个文件吗?

谢谢。

编辑:按要求添加。

org.apache.hive.service.cli.HiveSQLException:处理语句时出错:失败:执行错误,return 来自 org.apache.hadoop.hive.ql.exec.DDLTask 的代码 1 在 org.apache.hive.service.cli.operation.Operation.toSQLException(Operation.java:380) org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:257) 在 org.apache.hive.service.cli.operation.SQLOperation.access$800(SQLOperation.java:91) 在 org.apache.hive.service.cli.operation.SQLOperation$BackgroundWork$1.run(SQLOperation.java:348) 在 java.security.AccessController.doPrivileged(本机方法) 在 javax.security.auth.Subject.doAs(Subject.java:422) 在 org.apache.hadoop.security.UserGroupInformation.doAs( UserGroupInformation.java:1698) 在 org.apache.hive.service.cli.operation.SQLOperation$BackgroundWork.run(SQLOperation.java:362) 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748)

Spark 是一个非常好的实用工具。您可以轻松 connect to a JDBC data source, and you can write to S3 by specifying credentials and an S3 path (e.g. Pyspark Save dataframe to S3).

如果您使用的是 AWS,那么 Spark、Presto 和 Hive 的最佳选择是使用 AWS Glue Metastore。这是一个数据目录,可将您的 s3 对象注册为数据库中的 tables,并提供 API 用于定位这些对象。

您的问题 2 的答案是肯定的,您可以有一个 table 引用多个文件。如果您有分区数据,您通常希望这样做。

Spark 读取 jdbc 通过多个连接拉取数据。这是 link

def
jdbc(url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): 

Construct a DataFrame representing the database table accessible via JDBC URL url named table. Partitions of the table will be retrieved in parallel based on the parameters passed to this function.

Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash your external database systems.

url
JDBC database url of the form jdbc:subprotocol:subname.

table
Name of the table in the external database.

columnName
the name of a column of integral type that will be used for partitioning.

lowerBound
the minimum value of columnName used to decide partition stride.

upperBound
the maximum value of columnName used to decide partition stride.

numPartitions
the number of partitions. This, along with lowerBound (inclusive), upperBound (exclusive), form partition strides for generated WHERE clause expressions used to split the column columnName evenly. When the input is less than 1, the number is set to 1.

connectionProperties
JDBC database connection arguments, a list of arbitrary string tag/value. Normally at least a "user" and "password" property should be included. "fetchsize" can be used to control the number of rows per fetch.DataFrame

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader

http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases

创建配置单元 table 并将分区列作为日期并保存并指定以下位置

create table table_name (
  id                int,
  dtDontQuery       string,
  name              string
)
partitioned by (date string) Location s3://s3://mybucket/table_name/

在您的数据中添加一个名为 date 的列,并用 sysdate 填充它。如果不需要,则无需添加该列,我们只需填充位置即可。但它也可以是您的分析的审计专栏。 使用火花 dataframe.partitionBy(date).write.parquet.location(s3://mybucket/table_name/)

每日执行 MSCK repair on the hive table 因此新分区被添加到 table。

在非数字列上应用 numPartitions 是通过将该列的哈希函数创建为您想要的连接数并使用该列

您可以按如下方式创建外部配置单元 table

create external table table_a (
 siteid                    string,
 nodeid                    string,
 aggregation_type          string
 )
 PARTITIONED BY (day string)
 STORED AS PARQUET
 LOCATION 's3://mybucket/table_a';

然后就可以运行下面的命令将每天目录下存放的分区文件注册到HiveMatastore

 MSCK REPAIR TABLE table_a;

现在您可以通过配置单元查询访问您的文件。我们在我们的项目中使用了这种方法并且运行良好。上面命令后,可以运行查询

 select * from table_a where day='day_1';

希望对您有所帮助。

-拉维

虽然来晚了一点,但是供以后参考。在我们的项目中,我们正是这样做的,我更喜欢 Sqoop 而不是 Spark。

原因:我使用 Glue 从 Mysql 读取数据到 S3 并且读取不是并行的(AWS Support 是否查看过它,这就是 Glue(使用 Pyspark)的工作方式,但是一旦读取完成其并行)。这效率不高而且很慢。 100GB数据读写S3需要1.5Hr.

所以我在启用了 Glue Catalog 的 EMR 上使用了 Sqoop(因此 hive metastore 在 AWS 上)并且我能够直接从 Sqoop 写入 S3,速度更快 100GB 的数据读取需要 20 分钟。

您必须设置集合 hive.metastore.warehouse.dir=s3://,如果您执行配置单元导入或直接写入,您应该会看到数据被写入 S3。