Apache Nifi/Cassandra - 如何将 CSV 加载到 Cassandra table
Apache Nifi/Cassandra - how to load CSV into Cassandra table
我每天收到数次各种 CSV 文件,存储来自传感器的时间序列数据,这些传感器是传感器站的一部分。每个 CSV 都以它来自的传感器站和传感器 ID 命名,例如 "station1_sensor2.csv"。目前,数据存储如下:
> cat station1_sensor2.csv
2016-05-04 03:02:01.001000+0000;0;
2016-05-04 03:02:01.002000+0000;0.1234;
2016-05-04 03:02:01.003000+0000;0.2345;
我创建了一个 Cassandra table 来存储它们并能够查询它们以获取各种已识别的任务。 Cassandra table 看起来像这样:
cqlsh > CREATE KEYSPACE data with replication = {'class' : 'SimpleStrategy', 'replication_factor' : 3};
CREATE TABLE sensor_data (
station_id text, // id of the station
sensor_id text, // id of the sensor
tps timestamp, // timestamp of the measure
val float, // measured value
PRIMARY KEY ((station_id, sensor_id), tps)
);
我想使用 Apache Nifi 将 CSV 中的数据自动存储到此 Cassandra Table,但我找不到正确执行此操作的示例或方案。我曾尝试使用 "PutCassandraQL" 处理器,但我在没有任何明确示例的情况下苦苦挣扎。因此,任何有关如何使用 Apache Nifi 执行 Cassandra put 查询以将数据插入 table 的帮助将不胜感激!
TL;DR 我有一个 NiFi 1.0 模板可以在 Gist and in the NiFi Wiki 上完成此操作。
NiFi 鼓励非常模块化的设计,所以让我们将其分解为更小的任务,我将描述一个可能的流程并根据您的用例解释每个处理器的用途:
读入 CSV 文件。这可以使用 GetFile 来完成,或者最好使用 ListFile -> FetchFile。在我的示例中,我使用脚本处理器在线创建一个流文件,其中包含上面的示例数据。这使我的模板便于其他人使用。
解析文件名以获取站点和传感器字段。这使用 NiFi Expression Language 获取文件名中下划线之前(对于站)和下划线之后(减去 CSV 扩展名)对于传感器的部分。
将单个 CSV 流文件拆分为每行一个流文件。这样做是为了稍后我们可以创建单独的 CQL INSERT 语句。
从每行中提取列值。我为此使用了 ExtractText 和正则表达式,如果您有非常复杂的逻辑,您可能需要查看脚本处理器,例如 ExecuteScript.
更改时间戳。 IIRC,CQL 不接受时间戳文字上的微秒。您可以尝试解析微秒(最好在 ExecuteScript 处理器中完成)或重新格式化时间戳。请注意 "re-formatting",因为无法解析微秒,导致在我的示例中截断所有小数秒。
构建 CQL INSERT 语句。此时数据(无论如何在我的模板中)都在流文件属性中,原始内容可以用 CQL INSERT 语句替换(这是 PutCassandraQL 期望的方式)。您可以将数据保存在属性中(使用 UpdateAttribute 正确命名它们,请参阅 PutCassandraQL 文档)并使用准备好的语句,但恕我直言,编写显式 CQL 语句更简单。在撰写本文时,PutCassandraQL 并未缓存 PreparedStatements,因此现在以这种方式执行操作实际上性能较低。
使用 PutCassandraQL 执行 CQL 语句。
我没有详细说明我的属性名称等,但是当流程到达 ReplaceText 时,我有以下属性:
- station.name: 包含从文件名解析出来的站名
- sensor.name:包含从文件名
解析的传感器名称
- tps:包含更新的时间戳值
- columns.2:包含(大概)传感器读数的值
ReplaceText 将内容设置为以下内容(使用表达式语言填充值):
insert into sensor_data (station_id, sensor_id, tps, val) values ('${station.name}', '${sensor.name}', '${tps}', ${column.2})
希望对您有所帮助,如果您有任何疑问或问题,请告诉我。干杯!
我每天收到数次各种 CSV 文件,存储来自传感器的时间序列数据,这些传感器是传感器站的一部分。每个 CSV 都以它来自的传感器站和传感器 ID 命名,例如 "station1_sensor2.csv"。目前,数据存储如下:
> cat station1_sensor2.csv
2016-05-04 03:02:01.001000+0000;0;
2016-05-04 03:02:01.002000+0000;0.1234;
2016-05-04 03:02:01.003000+0000;0.2345;
我创建了一个 Cassandra table 来存储它们并能够查询它们以获取各种已识别的任务。 Cassandra table 看起来像这样:
cqlsh > CREATE KEYSPACE data with replication = {'class' : 'SimpleStrategy', 'replication_factor' : 3};
CREATE TABLE sensor_data (
station_id text, // id of the station
sensor_id text, // id of the sensor
tps timestamp, // timestamp of the measure
val float, // measured value
PRIMARY KEY ((station_id, sensor_id), tps)
);
我想使用 Apache Nifi 将 CSV 中的数据自动存储到此 Cassandra Table,但我找不到正确执行此操作的示例或方案。我曾尝试使用 "PutCassandraQL" 处理器,但我在没有任何明确示例的情况下苦苦挣扎。因此,任何有关如何使用 Apache Nifi 执行 Cassandra put 查询以将数据插入 table 的帮助将不胜感激!
TL;DR 我有一个 NiFi 1.0 模板可以在 Gist and in the NiFi Wiki 上完成此操作。
NiFi 鼓励非常模块化的设计,所以让我们将其分解为更小的任务,我将描述一个可能的流程并根据您的用例解释每个处理器的用途:
读入 CSV 文件。这可以使用 GetFile 来完成,或者最好使用 ListFile -> FetchFile。在我的示例中,我使用脚本处理器在线创建一个流文件,其中包含上面的示例数据。这使我的模板便于其他人使用。
解析文件名以获取站点和传感器字段。这使用 NiFi Expression Language 获取文件名中下划线之前(对于站)和下划线之后(减去 CSV 扩展名)对于传感器的部分。
将单个 CSV 流文件拆分为每行一个流文件。这样做是为了稍后我们可以创建单独的 CQL INSERT 语句。
从每行中提取列值。我为此使用了 ExtractText 和正则表达式,如果您有非常复杂的逻辑,您可能需要查看脚本处理器,例如 ExecuteScript.
更改时间戳。 IIRC,CQL 不接受时间戳文字上的微秒。您可以尝试解析微秒(最好在 ExecuteScript 处理器中完成)或重新格式化时间戳。请注意 "re-formatting",因为无法解析微秒,导致在我的示例中截断所有小数秒。
构建 CQL INSERT 语句。此时数据(无论如何在我的模板中)都在流文件属性中,原始内容可以用 CQL INSERT 语句替换(这是 PutCassandraQL 期望的方式)。您可以将数据保存在属性中(使用 UpdateAttribute 正确命名它们,请参阅 PutCassandraQL 文档)并使用准备好的语句,但恕我直言,编写显式 CQL 语句更简单。在撰写本文时,PutCassandraQL 并未缓存 PreparedStatements,因此现在以这种方式执行操作实际上性能较低。
使用 PutCassandraQL 执行 CQL 语句。
我没有详细说明我的属性名称等,但是当流程到达 ReplaceText 时,我有以下属性:
- station.name: 包含从文件名解析出来的站名
- sensor.name:包含从文件名 解析的传感器名称
- tps:包含更新的时间戳值
- columns.2:包含(大概)传感器读数的值
ReplaceText 将内容设置为以下内容(使用表达式语言填充值):
insert into sensor_data (station_id, sensor_id, tps, val) values ('${station.name}', '${sensor.name}', '${tps}', ${column.2})
希望对您有所帮助,如果您有任何疑问或问题,请告诉我。干杯!