大数据信号分析:存储和查询信号数据的更好方式
Big data signal analysis: better way to store and query signal data
我正准备使用 Hadoop/Spark 进行一些信号分析,我需要有关如何构建整个过程的帮助。
信号现在存储在数据库中,我们将使用 Sqoop 读取该信号并将其转换为 HDFS 上的文件,其架构类似于:
<Measure ID> <Source ID> <Measure timestamp> <Signal values>
其中信号值只是由浮点逗号分隔的数字组成的字符串。
000123 S001 2015/04/22T10:00:00.000Z 0.0,1.0,200.0,30.0 ... 100.0
000124 S001 2015/04/22T10:05:23.245Z 0.0,4.0,250.0,35.0 ... 10.0
...
000126 S003 2015/04/22T16:00:00.034Z 0.0,0.0,200.0,00.0 ... 600.0
我们想将 interactive/batch 个查询写入:
对信号值应用聚合函数
SELECT *
FROM SIGNALS
WHERE MAX(VALUES) > 1000.0
到 select 峰值超过 1000.0 的信号。
在聚合上应用聚合
SELECT SOURCEID, MAX(VALUES)
FROM SIGNALS
GROUP BY SOURCEID
HAVING MAX(MAX(VALUES)) > 1500.0
至 select 个至少有一个信号超过 1500.0 的来源。
对样本应用用户定义的函数
SELECT *
FROM SIGNALS
WHERE MAX(LOW_BAND_FILTER("5.0 KHz", VALUES)) > 100.0)
到select表示经过5.0KHz滤波后的值至少大于100.0。
我们需要一些帮助才能:
- 找到正确的文件格式以将信号数据写入 HDFS。我想到了 Apache Parquet。您将如何构建数据?
- 了解正确的数据分析方法:是创建不同的数据集(例如,使用 Spark 处理数据并将结果保存在 HDFS 上)还是尝试在查询时从原始数据集做所有事情更好?
- Hive 是一个很好的工具来查询我写的那些吗?我们在 Cloudera Enterprise Hadoop 上 运行,所以我们也可以使用 Impala.
- 如果我们生成与原始数据集不同的派生数据集,我们如何跟踪数据的沿袭,即知道数据是如何从原始版本生成的?
非常感谢!
1) Parquet 作为柱状格式非常适合 OLAP。 Parquet 的 Spark 支持已经足够成熟,可以用于生产。我建议将表示信号值的字符串解析为以下数据结构(简化):
case class Data(id: Long, signals: Array[Double])
val df = sqlContext.createDataFrame(Seq(Data(1L, Array(1.0, 1.0, 2.0)), Data(2L, Array(3.0, 5.0)), Data(2L, Array(1.5, 7.0, 8.0))))
保留 double 数组允许像这样定义和使用 UDF:
def maxV(arr: mutable.WrappedArray[Double]) = arr.max
sqlContext.udf.register("maxVal", maxV _)
df.registerTempTable("table")
sqlContext.sql("select * from table where maxVal(signals) > 2.1").show()
+---+---------------+
| id| signals|
+---+---------------+
| 2| [3.0, 5.0]|
| 2|[1.5, 7.0, 8.0]|
+---+---------------+
sqlContext.sql("select id, max(maxVal(signals)) as maxSignal from table group by id having maxSignal > 1.5").show()
+---+---------+
| id|maxSignal|
+---+---------+
| 1| 2.0|
| 2| 8.0|
+---+---------+
或者,如果你想要一些类型安全,使用 Scala DSL:
import org.apache.spark.sql.functions._
val maxVal = udf(maxV _)
df.select("*").where(maxVal($"signals") > 2.1).show()
df.select($"id", maxVal($"signals") as "maxSignal").groupBy($"id").agg(max($"maxSignal")).where(max($"maxSignal") > 2.1).show()
+---+--------------+
| id|max(maxSignal)|
+---+--------------+
| 2| 8.0|
+---+--------------+
2) 这取决于:如果您的数据大小允许以合理的延迟在查询时间内完成所有处理 - 那就去做吧。您可以从这种方法开始,稍后为 slow/popular 查询构建优化结构
3) Hive 很慢,Impala 和 Spark SQL 已经过时了。有时选择并不容易,我们使用经验法则:如果所有数据都存储在 HDFS/Hive,Impala 适用于没有连接的查询,Spark 具有更大的延迟但连接可靠,它支持更多数据源并且具有丰富的非SQL处理能力(如MLlib和GraphX)
4) 保持简单:存储原始数据(主数据集)去重和分区(我们使用基于时间的分区)。如果新数据到达分区并且您已经生成了下游数据集 - 重新启动该分区的管道。
希望对您有所帮助
首先,我认为Vitaliy的方法在各个方面都非常好。 (我完全支持 Spark)
不过,我想提出另一种方法。原因是:
- 我们想进行交互式查询(+ 我们有 CDH)
- 数据已经结构化
- 需要的是'analyze'而不是'processing'的数据。如果 (a) 数据结构化,我们可以更快地形成 sql 查询,并且 (b) 我们不想每次想 运行 查询时都编写程序
以下是我想要执行的步骤:
- 使用 sqoop 摄取到 HDFS:[可选] 使用 --as-parquetfile
- 根据需要创建外部 Impala table 或内部 table。如果您尚未将文件作为 parquet 文件传输,则可以在此步骤中执行此操作。分区依据,最好是源 ID,因为我们的分组将发生在该列上。
所以,基本上,一旦我们传输了数据,我们需要做的就是创建一个 Impala table,最好是镶木地板格式,并按我们想要的列进行分区将用于分组。记得加载后做计算统计帮助Impala运行它更快。
移动数据:
- 如果我们需要从结果中生成提要,请创建一个单独的文件
- 如果另一个系统要更新现有数据,则在创建时将数据移动到不同的位置->加载 table
- 如果只是查询、分析和获取报告(即外部 table 就足够了),我们不需要不必要地移动数据
- 我们可以在相同数据之上创建一个外部配置单元 table。如果我们需要运行long-运行ning批量查询,可以使用Hive。不过,对于交互式查询来说,这是一个禁忌。如果我们从查询中创建任何派生的 tables 并希望通过 Impala 使用,请记住在 运行ning impala 查询之前 运行 'invalidate metadata'在配置单元生成的 tables
血统 - 我没有深入研究,这里是 link on Impala lineage using Cloudera Navigator
我正准备使用 Hadoop/Spark 进行一些信号分析,我需要有关如何构建整个过程的帮助。
信号现在存储在数据库中,我们将使用 Sqoop 读取该信号并将其转换为 HDFS 上的文件,其架构类似于:
<Measure ID> <Source ID> <Measure timestamp> <Signal values>
其中信号值只是由浮点逗号分隔的数字组成的字符串。
000123 S001 2015/04/22T10:00:00.000Z 0.0,1.0,200.0,30.0 ... 100.0
000124 S001 2015/04/22T10:05:23.245Z 0.0,4.0,250.0,35.0 ... 10.0
...
000126 S003 2015/04/22T16:00:00.034Z 0.0,0.0,200.0,00.0 ... 600.0
我们想将 interactive/batch 个查询写入:
对信号值应用聚合函数
SELECT *
FROM SIGNALS
WHERE MAX(VALUES) > 1000.0
到 select 峰值超过 1000.0 的信号。
在聚合上应用聚合
SELECT SOURCEID, MAX(VALUES)
FROM SIGNALS
GROUP BY SOURCEID
HAVING MAX(MAX(VALUES)) > 1500.0
至 select 个至少有一个信号超过 1500.0 的来源。
对样本应用用户定义的函数
SELECT *
FROM SIGNALS
WHERE MAX(LOW_BAND_FILTER("5.0 KHz", VALUES)) > 100.0)
到select表示经过5.0KHz滤波后的值至少大于100.0。
我们需要一些帮助才能:
- 找到正确的文件格式以将信号数据写入 HDFS。我想到了 Apache Parquet。您将如何构建数据?
- 了解正确的数据分析方法:是创建不同的数据集(例如,使用 Spark 处理数据并将结果保存在 HDFS 上)还是尝试在查询时从原始数据集做所有事情更好?
- Hive 是一个很好的工具来查询我写的那些吗?我们在 Cloudera Enterprise Hadoop 上 运行,所以我们也可以使用 Impala.
- 如果我们生成与原始数据集不同的派生数据集,我们如何跟踪数据的沿袭,即知道数据是如何从原始版本生成的?
非常感谢!
1) Parquet 作为柱状格式非常适合 OLAP。 Parquet 的 Spark 支持已经足够成熟,可以用于生产。我建议将表示信号值的字符串解析为以下数据结构(简化):
case class Data(id: Long, signals: Array[Double])
val df = sqlContext.createDataFrame(Seq(Data(1L, Array(1.0, 1.0, 2.0)), Data(2L, Array(3.0, 5.0)), Data(2L, Array(1.5, 7.0, 8.0))))
保留 double 数组允许像这样定义和使用 UDF:
def maxV(arr: mutable.WrappedArray[Double]) = arr.max
sqlContext.udf.register("maxVal", maxV _)
df.registerTempTable("table")
sqlContext.sql("select * from table where maxVal(signals) > 2.1").show()
+---+---------------+
| id| signals|
+---+---------------+
| 2| [3.0, 5.0]|
| 2|[1.5, 7.0, 8.0]|
+---+---------------+
sqlContext.sql("select id, max(maxVal(signals)) as maxSignal from table group by id having maxSignal > 1.5").show()
+---+---------+
| id|maxSignal|
+---+---------+
| 1| 2.0|
| 2| 8.0|
+---+---------+
或者,如果你想要一些类型安全,使用 Scala DSL:
import org.apache.spark.sql.functions._
val maxVal = udf(maxV _)
df.select("*").where(maxVal($"signals") > 2.1).show()
df.select($"id", maxVal($"signals") as "maxSignal").groupBy($"id").agg(max($"maxSignal")).where(max($"maxSignal") > 2.1).show()
+---+--------------+
| id|max(maxSignal)|
+---+--------------+
| 2| 8.0|
+---+--------------+
2) 这取决于:如果您的数据大小允许以合理的延迟在查询时间内完成所有处理 - 那就去做吧。您可以从这种方法开始,稍后为 slow/popular 查询构建优化结构
3) Hive 很慢,Impala 和 Spark SQL 已经过时了。有时选择并不容易,我们使用经验法则:如果所有数据都存储在 HDFS/Hive,Impala 适用于没有连接的查询,Spark 具有更大的延迟但连接可靠,它支持更多数据源并且具有丰富的非SQL处理能力(如MLlib和GraphX)
4) 保持简单:存储原始数据(主数据集)去重和分区(我们使用基于时间的分区)。如果新数据到达分区并且您已经生成了下游数据集 - 重新启动该分区的管道。
希望对您有所帮助
首先,我认为Vitaliy的方法在各个方面都非常好。 (我完全支持 Spark)
不过,我想提出另一种方法。原因是:
- 我们想进行交互式查询(+ 我们有 CDH)
- 数据已经结构化
- 需要的是'analyze'而不是'processing'的数据。如果 (a) 数据结构化,我们可以更快地形成 sql 查询,并且 (b) 我们不想每次想 运行 查询时都编写程序
以下是我想要执行的步骤:
- 使用 sqoop 摄取到 HDFS:[可选] 使用 --as-parquetfile
- 根据需要创建外部 Impala table 或内部 table。如果您尚未将文件作为 parquet 文件传输,则可以在此步骤中执行此操作。分区依据,最好是源 ID,因为我们的分组将发生在该列上。
所以,基本上,一旦我们传输了数据,我们需要做的就是创建一个 Impala table,最好是镶木地板格式,并按我们想要的列进行分区将用于分组。记得加载后做计算统计帮助Impala运行它更快。
移动数据: - 如果我们需要从结果中生成提要,请创建一个单独的文件 - 如果另一个系统要更新现有数据,则在创建时将数据移动到不同的位置->加载 table - 如果只是查询、分析和获取报告(即外部 table 就足够了),我们不需要不必要地移动数据 - 我们可以在相同数据之上创建一个外部配置单元 table。如果我们需要运行long-运行ning批量查询,可以使用Hive。不过,对于交互式查询来说,这是一个禁忌。如果我们从查询中创建任何派生的 tables 并希望通过 Impala 使用,请记住在 运行ning impala 查询之前 运行 'invalidate metadata'在配置单元生成的 tables
血统 - 我没有深入研究,这里是 link on Impala lineage using Cloudera Navigator