AWS Athena 创建 table 和分区
AWS Athena create table and partition
我将传感器数据存储在 S3 中(每 5 分钟写入一次数据):
farm_iot/sensor_data/farm/farm0001/sensor01/1541252701443
1541252701443 是一个 json 文件,其中包含测量值:
{ "temperature": 14.78, "pressure": 961.70, "humidity": 68.32}
我肯定缺少一些蜂巢技能。不幸的是,我没有找到一个提取时间序列 json 数据的示例来帮助我入门。我也不确定 Hive / Athena 是否支持这种数据提取。
我正在努力为这个数据创建 Athena table...
CREATE EXTERNAL TABLE IF NOT EXISTS farm.sensor_data (
device string,
sensor string,
data_point string,
value double
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://farm-iot/sensor_data/farm/farm0001/sensor01/'
PARTITIONED BY (timestamp string)
TBLPROPERTIES ('has_encrypted_data'='false')
我正在考虑的另一条路是将数据存储在更易于处理的结构中/也许我对数据的分区不够??!
所以也许我应该将 dt 添加到这样的结构中:
farm_iot/sensor_data/2018-11-03-02-45-02/farm/farm0001/sensor01/1541252701443
仍然无法让我到达我想去的地方:
+---------------+----------+----------+-------------+--------+
| timestamp | device | sensor | data_point | value |
+---------------+----------+----------+-------------+--------+
| 1541252701443 | farm0001 | sensor01 | temperature | 14.78 |
+---------------+----------+----------+-------------+--------+
| 1541252701443 | farm0001 | sensor01 | humidity | 68.32 |
+---------------+----------+----------+-------------+--------+
| 1541252701443 | farm0001 | sensor01 | pressure | 961.70 |
+---------------+----------+----------+-------------+--------+
任何指向此目标的指示都将不胜感激。谢谢!
请注意:我不想使用胶水,想了解如何手动操作。除了昨天已经创建的胶水 ~16.000 tables :)
让我试着解释一下我在前面看到的几个问题。
- 看起来你想要的输出需要一些数据,这些数据是路径文件位置、设备和传感器的一部分,但是它没有被定义为你的 table 定义的一部分,只有 [=87] 中的列=] 定义或 virtual columns 将可用。
- 几个小文件可能会影响您的查询性能(但这不会影响您想要的结果)
- Hive 分区用于提高查询的性能,避免扫描
所有的数据。分区指向文件夹,在这种情况下,您正在尝试访问特定文件
- 你想要的输出基本上是在几条记录中分解 1 条记录,这不应该在 table 定义中处理,可以通过你的 select 语句
来完成
- Hive 分区的命名约定为
partitionname=partitionvalue
,这不是强制性的,但如果您想提前使用命令根据您的文件夹结构自动添加分区,这很有用。
如果您主要通过传感器或设备进行查询,我会这样解决您的问题
更改数据结构
理想情况下,您的文件夹结构应该来自
farm_iot/sensor_data/farm/farm0001/sensor01/1541252701443
到farm_iot/sensor_data/farm/device=farm0001/sensor=sensor01/1541252701443
更改您的 table 定义
您的 table 定义应该包含您的分区位置,以便能够 select 它没有正则表达式并利用它的性能改进(我猜一个常见的查询将按设备或传感器。除此之外,您还需要添加属于文件
的所有 json 列
CREATE EXTERNAL TABLE IF NOT EXISTS farm.sensor_data (
temperature double,
preassure double,
humidity double
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://farm-iot/sensor_data/farm/'
PARTITIONED BY (device string, sensor string)
TBLPROPERTIES ('has_encrypted_data'='false')
正在查询您的数据
我们缺少时间戳,它本质上是您输入 json 文件名的一部分。我们可以使用虚拟列 INPUT__FILE__NAME
在 select 语句中包含文件名,如下所示
select device, sensor, temperature, preassure, humidity, INPUT__FILE__NAME as mytimestamp from farm.sensor_data
如果你想要压力、温度和湿度以及不同的行,我建议用这三个创建一个数组并将其展开,运行 3 个查询使用 UNION ALL 附加结果应该会更有效
添加新分区
如果您遵循 Hive 约定,您可以利用命令 msck repair table 在包含新 devices/sensors 后自动添加新分区。在最坏的情况下,如果你想保留你的文件夹结构,你可以添加分区如下
ALTER TABLE test ADD PARTITION (device='farm0001', sensor='sensor01') location 's3://farm_iot/sensor_data/farm/farm0001/sensor01'
注意:不会自动添加新分区,您始终需要添加它们
我尝试添加尽可能多的细节。如果有什么不清楚,请告诉我。
编辑:
如果您的查询主要基于时间序列(例如日期范围),我建议在日级别(不小于此)添加一个分区以提高查询的性能。所以你的 table 定义看起来像
CREATE EXTERNAL TABLE IF NOT EXISTS farm.sensor_data (
temperature double,
preassure double,
humidity double
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://farm-iot/sensor_data/farm/'
PARTITIONED BY (dt=long, device string, sensor string)
TBLPROPERTIES ('has_encrypted_data'='false')
你的文件夹结构看起来像
farm_iot/sensor_data/farm/dt=20191204/设备=farm0001/sensor=sensor01/1541252701443
作为说明,您不需要为每个新分区修改 table,只需将此分区添加到 table,这实际上是 Hive 知道新分区已创建的方式.如果你决定使用分区,这是唯一的方法,如果你不这样做(这会影响性能),还有一些其他的选择可以让它工作
编辑 2:
如果你想保持你的数据结构不变,不使用分区,可以获得如下预期结果
CREATE EXTERNAL TABLE IF NOT EXISTS yourdb.sensordata (
temperature double,
pressure double,
humidity double
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
)
LOCATION 's3://farm-iot/sensor_data/farm/'
TBLPROPERTIES ('has_encrypted_data'='false');
SET hive.mapred.supports.subdirectories=TRUE;
SET mapred.input.dir.recursive=TRUE;
select * from yourdb.sensordata;
select
split(input__file__name, "/")[size(split(input__file__name, "/")) - 1] as ts,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 3] as device,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 2] as sensor,
'temperature' as data_point,
temperature as value
from yourdb.sensordata
union all
select
split(input__file__name, "/")[size(split(input__file__name, "/")) - 1] as ts,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 3] as device,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 2] as sensor,
'pressure' as data_point,
pressure as value
from yourdb.sensordata
union all
select
split(input__file__name, "/")[size(split(input__file__name, "/")) - 1] as ts,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 3] as device,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 2] as sensor,
'humidity' as data_point,
humidity as value
from yourdb.sensordata;
如您所见,我从文件路径中获取了大部分信息,但是需要设置一些标志以递归地告诉 Hive 读取文件夹
ts,device,sensor,_data_point,value
1541252701443,farm0001,sensor01,temperature,14.78
1541252701443,farm0001,sensor01,pressure,961.7
1541252701443,farm0001,sensor01,humidity,68.32
首先非常感谢@hlagos 的帮助。
AWS Athena 无法按照我需要的方式转换 json 传感器数据(我们在对@hlagos 回答的评论中讨论了这个问题)。因此 "simplest" 处理这种情况的方法是将数据格式从 json 更改为 CSV 以更接近我需要的格式。
我现在将传感器数据以 CSV 格式存储在 S3 中(每 5 分钟写入一次数据),另外我还添加了我们讨论过的日期和设备分区。
生成的文件夹结构:
farm_iot/sensor_data/farm/day=20181129/device=farm0001/1543535738493
CSV文件的数据内容:
sensor01,temperature,2.82
sensor01,pressure,952.83
sensor01,humidity,83.64
sensor02,temperature,2.61
sensor02,pressure,952.74
sensor02,humidity,82.41
AWS Athena table 定义:
CREATE EXTERNAL TABLE IF NOT EXISTS farm.sensor_data (
`sensor` string,
`data_point` string,
`value` double
)
PARTITIONED BY (day string, device string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
ESCAPED BY '\'
LINES TERMINATED BY '\n'
LOCATION 's3://farm-iot/sensor_data/farm/'
TBLPROPERTIES ('has_encrypted_data'='false');
我添加的分区是这样的(后面会有脚本提前创建分区):
msck repair table farm.sensor_data
现在可以查询数据了:
select regexp_extract("$path", '[^/]+$') as timestamp, device, sensor,
data_point, value from farm.sensor_data where day='20181104'
Results
timestamp device sensor data_point value
1 1541310040278 farm0001 sensor01 temperature 21.61
2 1541310040278 farm0001 sensor01 pressure 643.65
3 1541310040278 farm0001 sensor01 humidity 74.84
4 1541310040278 farm0001 sensor02 temperature 9.14
5 1541310040278 farm0001 sensor02 pressure 956.04
6 1541310040278 farm0001 sensor02 humidity 88.01
7 1541311840309 farm0001 sensor01 temperature 21.61
8 ...
我将传感器数据存储在 S3 中(每 5 分钟写入一次数据):
farm_iot/sensor_data/farm/farm0001/sensor01/1541252701443
1541252701443 是一个 json 文件,其中包含测量值:
{ "temperature": 14.78, "pressure": 961.70, "humidity": 68.32}
我肯定缺少一些蜂巢技能。不幸的是,我没有找到一个提取时间序列 json 数据的示例来帮助我入门。我也不确定 Hive / Athena 是否支持这种数据提取。
我正在努力为这个数据创建 Athena table...
CREATE EXTERNAL TABLE IF NOT EXISTS farm.sensor_data (
device string,
sensor string,
data_point string,
value double
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://farm-iot/sensor_data/farm/farm0001/sensor01/'
PARTITIONED BY (timestamp string)
TBLPROPERTIES ('has_encrypted_data'='false')
我正在考虑的另一条路是将数据存储在更易于处理的结构中/也许我对数据的分区不够??!
所以也许我应该将 dt 添加到这样的结构中:
farm_iot/sensor_data/2018-11-03-02-45-02/farm/farm0001/sensor01/1541252701443
仍然无法让我到达我想去的地方:
+---------------+----------+----------+-------------+--------+
| timestamp | device | sensor | data_point | value |
+---------------+----------+----------+-------------+--------+
| 1541252701443 | farm0001 | sensor01 | temperature | 14.78 |
+---------------+----------+----------+-------------+--------+
| 1541252701443 | farm0001 | sensor01 | humidity | 68.32 |
+---------------+----------+----------+-------------+--------+
| 1541252701443 | farm0001 | sensor01 | pressure | 961.70 |
+---------------+----------+----------+-------------+--------+
任何指向此目标的指示都将不胜感激。谢谢!
请注意:我不想使用胶水,想了解如何手动操作。除了昨天已经创建的胶水 ~16.000 tables :)
让我试着解释一下我在前面看到的几个问题。
- 看起来你想要的输出需要一些数据,这些数据是路径文件位置、设备和传感器的一部分,但是它没有被定义为你的 table 定义的一部分,只有 [=87] 中的列=] 定义或 virtual columns 将可用。
- 几个小文件可能会影响您的查询性能(但这不会影响您想要的结果)
- Hive 分区用于提高查询的性能,避免扫描 所有的数据。分区指向文件夹,在这种情况下,您正在尝试访问特定文件
- 你想要的输出基本上是在几条记录中分解 1 条记录,这不应该在 table 定义中处理,可以通过你的 select 语句 来完成
- Hive 分区的命名约定为
partitionname=partitionvalue
,这不是强制性的,但如果您想提前使用命令根据您的文件夹结构自动添加分区,这很有用。
如果您主要通过传感器或设备进行查询,我会这样解决您的问题
更改数据结构
理想情况下,您的文件夹结构应该来自
farm_iot/sensor_data/farm/farm0001/sensor01/1541252701443
到farm_iot/sensor_data/farm/device=farm0001/sensor=sensor01/1541252701443
更改您的 table 定义
您的 table 定义应该包含您的分区位置,以便能够 select 它没有正则表达式并利用它的性能改进(我猜一个常见的查询将按设备或传感器。除此之外,您还需要添加属于文件
的所有 json 列CREATE EXTERNAL TABLE IF NOT EXISTS farm.sensor_data (
temperature double,
preassure double,
humidity double
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://farm-iot/sensor_data/farm/'
PARTITIONED BY (device string, sensor string)
TBLPROPERTIES ('has_encrypted_data'='false')
正在查询您的数据
我们缺少时间戳,它本质上是您输入 json 文件名的一部分。我们可以使用虚拟列 INPUT__FILE__NAME
在 select 语句中包含文件名,如下所示
select device, sensor, temperature, preassure, humidity, INPUT__FILE__NAME as mytimestamp from farm.sensor_data
如果你想要压力、温度和湿度以及不同的行,我建议用这三个创建一个数组并将其展开,运行 3 个查询使用 UNION ALL 附加结果应该会更有效
添加新分区
如果您遵循 Hive 约定,您可以利用命令 msck repair table 在包含新 devices/sensors 后自动添加新分区。在最坏的情况下,如果你想保留你的文件夹结构,你可以添加分区如下
ALTER TABLE test ADD PARTITION (device='farm0001', sensor='sensor01') location 's3://farm_iot/sensor_data/farm/farm0001/sensor01'
注意:不会自动添加新分区,您始终需要添加它们
我尝试添加尽可能多的细节。如果有什么不清楚,请告诉我。
编辑: 如果您的查询主要基于时间序列(例如日期范围),我建议在日级别(不小于此)添加一个分区以提高查询的性能。所以你的 table 定义看起来像
CREATE EXTERNAL TABLE IF NOT EXISTS farm.sensor_data (
temperature double,
preassure double,
humidity double
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://farm-iot/sensor_data/farm/'
PARTITIONED BY (dt=long, device string, sensor string)
TBLPROPERTIES ('has_encrypted_data'='false')
你的文件夹结构看起来像
farm_iot/sensor_data/farm/dt=20191204/设备=farm0001/sensor=sensor01/1541252701443
作为说明,您不需要为每个新分区修改 table,只需将此分区添加到 table,这实际上是 Hive 知道新分区已创建的方式.如果你决定使用分区,这是唯一的方法,如果你不这样做(这会影响性能),还有一些其他的选择可以让它工作
编辑 2:
如果你想保持你的数据结构不变,不使用分区,可以获得如下预期结果
CREATE EXTERNAL TABLE IF NOT EXISTS yourdb.sensordata (
temperature double,
pressure double,
humidity double
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
)
LOCATION 's3://farm-iot/sensor_data/farm/'
TBLPROPERTIES ('has_encrypted_data'='false');
SET hive.mapred.supports.subdirectories=TRUE;
SET mapred.input.dir.recursive=TRUE;
select * from yourdb.sensordata;
select
split(input__file__name, "/")[size(split(input__file__name, "/")) - 1] as ts,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 3] as device,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 2] as sensor,
'temperature' as data_point,
temperature as value
from yourdb.sensordata
union all
select
split(input__file__name, "/")[size(split(input__file__name, "/")) - 1] as ts,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 3] as device,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 2] as sensor,
'pressure' as data_point,
pressure as value
from yourdb.sensordata
union all
select
split(input__file__name, "/")[size(split(input__file__name, "/")) - 1] as ts,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 3] as device,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 2] as sensor,
'humidity' as data_point,
humidity as value
from yourdb.sensordata;
如您所见,我从文件路径中获取了大部分信息,但是需要设置一些标志以递归地告诉 Hive 读取文件夹
ts,device,sensor,_data_point,value
1541252701443,farm0001,sensor01,temperature,14.78
1541252701443,farm0001,sensor01,pressure,961.7
1541252701443,farm0001,sensor01,humidity,68.32
首先非常感谢@hlagos 的帮助。
AWS Athena 无法按照我需要的方式转换 json 传感器数据(我们在对@hlagos 回答的评论中讨论了这个问题)。因此 "simplest" 处理这种情况的方法是将数据格式从 json 更改为 CSV 以更接近我需要的格式。
我现在将传感器数据以 CSV 格式存储在 S3 中(每 5 分钟写入一次数据),另外我还添加了我们讨论过的日期和设备分区。
生成的文件夹结构:
farm_iot/sensor_data/farm/day=20181129/device=farm0001/1543535738493
CSV文件的数据内容:
sensor01,temperature,2.82
sensor01,pressure,952.83
sensor01,humidity,83.64
sensor02,temperature,2.61
sensor02,pressure,952.74
sensor02,humidity,82.41
AWS Athena table 定义:
CREATE EXTERNAL TABLE IF NOT EXISTS farm.sensor_data (
`sensor` string,
`data_point` string,
`value` double
)
PARTITIONED BY (day string, device string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
ESCAPED BY '\'
LINES TERMINATED BY '\n'
LOCATION 's3://farm-iot/sensor_data/farm/'
TBLPROPERTIES ('has_encrypted_data'='false');
我添加的分区是这样的(后面会有脚本提前创建分区):
msck repair table farm.sensor_data
现在可以查询数据了:
select regexp_extract("$path", '[^/]+$') as timestamp, device, sensor,
data_point, value from farm.sensor_data where day='20181104'
Results
timestamp device sensor data_point value
1 1541310040278 farm0001 sensor01 temperature 21.61
2 1541310040278 farm0001 sensor01 pressure 643.65
3 1541310040278 farm0001 sensor01 humidity 74.84
4 1541310040278 farm0001 sensor02 temperature 9.14
5 1541310040278 farm0001 sensor02 pressure 956.04
6 1541310040278 farm0001 sensor02 humidity 88.01
7 1541311840309 farm0001 sensor01 temperature 21.61
8 ...