Spark SQL 的 SparklyR 包装器:sqlContext.sql
SparklyR wrapper for spark SQL: sqlContext.sql
我正在尝试为 SparklyR 编写 SQL 函数的包装器。我创建了以下函数:
sqlfunction <- function(sc, block) {
spark_context(sc) %>%
invoke("sqlContext.sql", block) }
然后我使用以下方式调用它:
newsqlData <- sqlfunction(sc, "select
substr(V1,1,2),
substr(V1,3,3),
substr(V1,6,6),
substr(V1,12,4),
substr(V1,16,4)
FROM TABLE1 WHERE V1 IS NOT NULL")
但我收到以下错误:
Error: java.lang.IllegalArgumentException: invalid method sqlContext.sql for object 12
at sparklyr.Invoke$.invoke(invoke.scala:113)
at sparklyr.StreamHandler$.handleMethodCall(stream.scala:89)
at sparklyr.StreamHandler$.read(stream.scala:55)
at sparklyr.BackendHandler.channelRead0(handler.scala:49)
at sparklyr.BackendHandler.channelRead0(handler.scala:14)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
如有任何建议或修正,我们将不胜感激。
应该是:
sqlfunction <- function(sc, block) {
spark_session(sc) %>% invoke("sql", block)
}
其中 sc
是 spark_connection
(来自 spark_connect(master = master_url)
的输出)。
这个:
spark_session(sc)
- 从连接对象中检索 SparkSession
。
invoke("sql", block)
- 以 block
作为参数调用 SparkSession
实例的 sql
方法。
使用示例:
library(sparklyr)
sc <- spark_connect(master = "local[*]")
sqlfunction(sc, "SELECT SPLIT('foo,bar', ',')")
<jobj[11]>
class org.apache.spark.sql.Dataset
[split(foo,bar, ,): array<string>]
这将为您提供对 Java 对象的引用。如果你愿意,你可以注册为临时 table:
... %>% invoke("createOrReplaceTempView", "some_name_for_the_view")
并使用 tbl
访问:
library(dplyr)
tbl(sc, "some_name_for_the_view")
或
... %>% sdf_register()
直接获取tbl_spark
对象。
您使用的代码:
spark_context
- 提取 SparkContext
个实例。
invoke("sqlContext.sql", block)
- 尝试调用不存在的方法 (sqlContext.sql
)。
在最新版本中,您可以将 invoke("createOrReplaceTempView", ...)
替换为简单的 sdf_register
。
我正在尝试为 SparklyR 编写 SQL 函数的包装器。我创建了以下函数:
sqlfunction <- function(sc, block) {
spark_context(sc) %>%
invoke("sqlContext.sql", block) }
然后我使用以下方式调用它:
newsqlData <- sqlfunction(sc, "select
substr(V1,1,2),
substr(V1,3,3),
substr(V1,6,6),
substr(V1,12,4),
substr(V1,16,4)
FROM TABLE1 WHERE V1 IS NOT NULL")
但我收到以下错误:
Error: java.lang.IllegalArgumentException: invalid method sqlContext.sql for object 12
at sparklyr.Invoke$.invoke(invoke.scala:113)
at sparklyr.StreamHandler$.handleMethodCall(stream.scala:89)
at sparklyr.StreamHandler$.read(stream.scala:55)
at sparklyr.BackendHandler.channelRead0(handler.scala:49)
at sparklyr.BackendHandler.channelRead0(handler.scala:14)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
如有任何建议或修正,我们将不胜感激。
应该是:
sqlfunction <- function(sc, block) {
spark_session(sc) %>% invoke("sql", block)
}
其中 sc
是 spark_connection
(来自 spark_connect(master = master_url)
的输出)。
这个:
spark_session(sc)
- 从连接对象中检索SparkSession
。invoke("sql", block)
- 以block
作为参数调用SparkSession
实例的sql
方法。
使用示例:
library(sparklyr)
sc <- spark_connect(master = "local[*]")
sqlfunction(sc, "SELECT SPLIT('foo,bar', ',')")
<jobj[11]>
class org.apache.spark.sql.Dataset
[split(foo,bar, ,): array<string>]
这将为您提供对 Java 对象的引用。如果你愿意,你可以注册为临时 table:
... %>% invoke("createOrReplaceTempView", "some_name_for_the_view")
并使用 tbl
访问:
library(dplyr)
tbl(sc, "some_name_for_the_view")
或
... %>% sdf_register()
直接获取tbl_spark
对象。
您使用的代码:
spark_context
- 提取SparkContext
个实例。invoke("sqlContext.sql", block)
- 尝试调用不存在的方法 (sqlContext.sql
)。
在最新版本中,您可以将 invoke("createOrReplaceTempView", ...)
替换为简单的 sdf_register
。