如何加入二进制字段?
How to join on binary field?
在 Scala/Spark 中,我正在尝试执行以下操作:
val portCalls_Ports =
portCalls.join(ports, portCalls("port_id") === ports("id"), "inner")
但是我收到以下错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException:
binary type expression port_id cannot be used in join conditions;
确实是二进制类型:
root
|-- id: binary (nullable = false)
|-- port_id: binary (nullable = false)
.
.
.
+--------------------+--------------------+
| id| port_id|
+--------------------+--------------------+
|[FB 89 A0 FF AA 0...|[B2 B2 84 B9 52 2...|
原样ports("id")
。
我正在使用以下库:
scalaVersion := "2.11.11"
libraryDependencies ++= Seq(
// Spark dependencies
"org.apache.spark" %% "spark-hive" % "1.6.2",
"org.apache.spark" %% "spark-mllib" % "1.6.2",
// Third-party libraries
"postgresql" % "postgresql" % "9.1-901-1.jdbc4",
"net.sf.jopt-simple" % "jopt-simple" % "5.0.3"
)
请注意,我正在使用 JDBC 读取数据库表。
解决此问题的最佳方法是什么?
Pre Spark 2.1.0,我知道的最佳解决方法是使用 base64
函数将二进制列转换为字符串,然后比较这些:
import org.apache.spark.sql.functions._
val portCalls_Ports =
portCalls.join(ports, base64(portCalls("port_id")) === base64(ports("id")), "inner")
在 Scala/Spark 中,我正在尝试执行以下操作:
val portCalls_Ports =
portCalls.join(ports, portCalls("port_id") === ports("id"), "inner")
但是我收到以下错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException:
binary type expression port_id cannot be used in join conditions;
确实是二进制类型:
root
|-- id: binary (nullable = false)
|-- port_id: binary (nullable = false)
.
.
.
+--------------------+--------------------+
| id| port_id|
+--------------------+--------------------+
|[FB 89 A0 FF AA 0...|[B2 B2 84 B9 52 2...|
原样ports("id")
。
我正在使用以下库:
scalaVersion := "2.11.11"
libraryDependencies ++= Seq(
// Spark dependencies
"org.apache.spark" %% "spark-hive" % "1.6.2",
"org.apache.spark" %% "spark-mllib" % "1.6.2",
// Third-party libraries
"postgresql" % "postgresql" % "9.1-901-1.jdbc4",
"net.sf.jopt-simple" % "jopt-simple" % "5.0.3"
)
请注意,我正在使用 JDBC 读取数据库表。
解决此问题的最佳方法是什么?
Pre Spark 2.1.0,我知道的最佳解决方法是使用 base64
函数将二进制列转换为字符串,然后比较这些:
import org.apache.spark.sql.functions._
val portCalls_Ports =
portCalls.join(ports, base64(portCalls("port_id")) === base64(ports("id")), "inner")