从 Spark 数据帧的行创建对象的规范方法是什么?
What is the canonical way to create objects from rows of a Spark dataframe?
我正在使用 Apache Zeppelin (0.9.0) 和 Scala (2.11.12)。我想从数据框中提取一些数据并将其存储到 InfluxDB,稍后在 Grafana 中可视化,但无法弄清楚。我正在尝试使用 foreach
循环的天真方法。这个想法是遍历所有行,提取我需要的列,创建一个 Point 对象(来自 this InfluxDB client library),然后将其发送到 InfluxDB 或将其添加到列表中,然后批量发送所有点,之后循环。
数据框如下所示:
+---------+---------+-------------+-----+
|timestamp|sessionId|eventDuration|speed|
+---------+---------+-------------+-----+
| 1| ses1| 0.0| 50|
| 2| ses1| 1.0| 50|
| 3| ses1| 2.0| 50|
我已尝试执行上述操作:
import scala.collection.mutable.ListBuffer
import spark.implicits._
import org.apache.spark.sql._
import com.paulgoldbaum.influxdbclient._
import scala.concurrent.ExecutionContext.Implicits.global
val influxdb = InfluxDB.connect("172.17.0.4", 8086)
val database = influxdb.selectDatabase("test")
var influxData = new ListBuffer[Point]()
dfAnalyseReport.foreach(row =>
{
val point = Point("acceleration")
.addTag("speedBin", row.getLong(3).toString)
.addField("eventDuration", row.getDouble(2))
influxData += point
}
)
val influxDataList = influxData.toList
database.bulkWrite(influxDataList)
我在这里得到的唯一东西是一个神秘的 java.lang.ClassCastException
,没有任何附加信息,既不在笔记本输出中,也不在 Zeppelin Docker 容器的日志中。错误似乎在 foreach 的某处,即使我注释掉最后两行时它也会出现。
我还尝试采用 中的方法 1,对列使用大小写 class,但无济于事。我没有错误地得到它 运行,但结果列表是空的。不幸的是我删除了那个尝试。如有必要,我可以重建它,但我在这上面花了很多时间,我相当肯定我对应该如何完成有一些根本性的误解。
还有一个问题:我还尝试将每个 Point 在构造时写入 DB(而不是批量)。唯一的区别是,我没有附加到 ListBuffer
,而是执行了 database.write(point)
操作。当在循环外使用虚拟点完成时,它会毫无问题地通过 - 数据最终在 InfluxDB 中 - 但在循环内它会导致 org.apache.spark.SparkException: Task not serializable
有人能指出我正确的方法吗?我应该如何解决这个问题?
我会用 RDD 映射方法来完成并将结果收集到列表中:
val influxDataList = dfAnalyseReport.rdd.map(
row => Point("acceleration")
.addTag("speedBin", row.getInt(3).toString)
.addField("eventDuration", row.getDouble(2))
).collect.toList
我正在使用 Apache Zeppelin (0.9.0) 和 Scala (2.11.12)。我想从数据框中提取一些数据并将其存储到 InfluxDB,稍后在 Grafana 中可视化,但无法弄清楚。我正在尝试使用 foreach
循环的天真方法。这个想法是遍历所有行,提取我需要的列,创建一个 Point 对象(来自 this InfluxDB client library),然后将其发送到 InfluxDB 或将其添加到列表中,然后批量发送所有点,之后循环。
数据框如下所示:
+---------+---------+-------------+-----+
|timestamp|sessionId|eventDuration|speed|
+---------+---------+-------------+-----+
| 1| ses1| 0.0| 50|
| 2| ses1| 1.0| 50|
| 3| ses1| 2.0| 50|
我已尝试执行上述操作:
import scala.collection.mutable.ListBuffer
import spark.implicits._
import org.apache.spark.sql._
import com.paulgoldbaum.influxdbclient._
import scala.concurrent.ExecutionContext.Implicits.global
val influxdb = InfluxDB.connect("172.17.0.4", 8086)
val database = influxdb.selectDatabase("test")
var influxData = new ListBuffer[Point]()
dfAnalyseReport.foreach(row =>
{
val point = Point("acceleration")
.addTag("speedBin", row.getLong(3).toString)
.addField("eventDuration", row.getDouble(2))
influxData += point
}
)
val influxDataList = influxData.toList
database.bulkWrite(influxDataList)
我在这里得到的唯一东西是一个神秘的 java.lang.ClassCastException
,没有任何附加信息,既不在笔记本输出中,也不在 Zeppelin Docker 容器的日志中。错误似乎在 foreach 的某处,即使我注释掉最后两行时它也会出现。
我还尝试采用
还有一个问题:我还尝试将每个 Point 在构造时写入 DB(而不是批量)。唯一的区别是,我没有附加到 ListBuffer
,而是执行了 database.write(point)
操作。当在循环外使用虚拟点完成时,它会毫无问题地通过 - 数据最终在 InfluxDB 中 - 但在循环内它会导致 org.apache.spark.SparkException: Task not serializable
有人能指出我正确的方法吗?我应该如何解决这个问题?
我会用 RDD 映射方法来完成并将结果收集到列表中:
val influxDataList = dfAnalyseReport.rdd.map(
row => Point("acceleration")
.addTag("speedBin", row.getInt(3).toString)
.addField("eventDuration", row.getDouble(2))
).collect.toList