Jooq雪花复制命令供应商特定功能
Jooq snowflake copy command vendor specific function
有没有人使用sql 模板或其他方法在 JOOQ 中实现雪花的 COPY 命令?我在那里找不到任何东西。我在 JOOQ 的 github 页面上看到一个关于它的问题,但目前似乎没有支持 COPY 命令的计划,因为它是供应商特定的功能。
如果那里什么都没有,也许有一些关于支持它的最佳方式的提示。所有不同的选项都有很多围绕它的语法。
我认为一些中间立场对您在这里实现动态 COPY
语句支持是最佳的。当然,您可以实现一个成熟的、jOOQ 风格的迷你 DSL 来支持 COPY
。 Here's the main idea behind jOOQ's fluent API design。但我认为这对于最常见的用例来说可能有点矫枉过正。
Leveraging plain SQL templating with some overloads might be enough. Looking at the Snowflake manual section about COPY
,这可能是一个最小的解决方案:
public static Query copy(
DSLContext ctx,
Table<?> into,
QueryPart from,
List<String> files
) {
return ctx.query("copy into {0} from {1} files = ({2})",
table,
from,
DSL.list(files.stream().map(DSL::inline).toList())
);
当然,您可以根据需要使它具有通用性和动态性,提供更多重载,或者再次提供一些构建器 API 甚至 DSL。另请查看 CustomQueryPart
and other custom syntax elements,了解何时字符串模板不够用。
我更进一步并实现了 DSL,具体语法为:COPY INTO <table/stage> FROM (SELECT FROM <table/stage>)...
这是我们的用例。
下面是 DSL 的主要代码,略有简化。您会看到对未显示其 impl 的各种自定义元素的引用,它们大多扩展 CustomQueryPart
.
请注意,它在 Scala 中 - 如果您不熟悉,请提前致歉。当我找到时间时,我将编辑并转换为 Java:
trait SnowflakeDSLContext extends DSLContext {
def copyInto[R <: Record, T1, T2](table: Table[R],
f1: Field[T1],
f2: Field[T2]) : CopyIntoFromStep2[R, T1, T2]
}
class SnowflakeDSLContextImpl(config: Configuration)
extends DefaultDSLContext(config) with SnowflakeDSLContext {
def copyInto[R <: Record, T1, T2](table: Table[R],
f1: Field[T1],
f2: Field[T2]): CopyIntoFromStep2[R, T1, T2] =
new CopyIntoImpl[R, T1, T2](this, table, Seq(f1, f2))
trait CopyIntoFromStep2[R <: Record, T1, T2] {
def from(select: Select[_ <: Record2[T1, T2]]): CopyIntoParametersStep
}
trait CopyIntoParametersStep extends CopyIntoFinalStep {
def fileFormat(formatType: FileFormatType,
maybeComprType: Option[CompressionType] = None): CopyIntoParametersStep
def purge(enabled: Boolean): CopyIntoParametersStep
}
trait CopyIntoFinalStep {
def fetchResults(): Seq[CopyIntoResult]
}
// This is the main class which "does the trick"
// by accumulating the query parts and building the final query
case class CopyIntoImpl[R <: Record, T1, T2](
dslContext: DSLContext,
table: Table[R],
fields: Seq[Field[_]],
maybeSelect: Option[Select[_ <: Record2[T1, T2]]] = None,
maybeFileFormat: Option[FileFormat] = None,
copyOptions: Seq[SingleValueParam[_]] = Nil
) extends CopyIntoFromStep2[R, T1, T2] with CopyIntoParametersStep {
override def from(select: Select[_ <: Record2[T1, T2]]): CopyIntoParametersStep = {
copy(maybeSelect = Some(select))
}
override def fileFormat(formatType: FileFormatType,
maybeCompr: Option[CompressionType] = None): CopyIntoParametersStep = {
copy(maybeFileFormat = Some(FileFormat(formatType, maybeCompr)))
}
override def purge(enabled: Boolean): CopyIntoParametersStep =
copy(copyOptions = copyOptions :+ Purge(enabled))
override def fetchResults(): Seq[CopyIntoResult] = {
dslContext.resultQuery("COPY INTO {0} {1} FROM ({2}) {3} {4}",
table,
UnqualifiedFieldList(fields),
maybeSelect.getOrElse(""),
maybeFileFormat.getOrElse(""),
LineDelimitedList(copyOptions))
.fetch(CopyIntoResultMapper)
.asScala
.toSeq
}
}
用法示例:
val results = dslContext.copyInto(targetTable, field1, field2)
.from(DSL.select(field1, field2)
.from(sourceTable))
.fileFormat(JSON, Some(GZIP))
.purge(true)
.fetchResults()
有没有人使用sql 模板或其他方法在 JOOQ 中实现雪花的 COPY 命令?我在那里找不到任何东西。我在 JOOQ 的 github 页面上看到一个关于它的问题,但目前似乎没有支持 COPY 命令的计划,因为它是供应商特定的功能。 如果那里什么都没有,也许有一些关于支持它的最佳方式的提示。所有不同的选项都有很多围绕它的语法。
我认为一些中间立场对您在这里实现动态 COPY
语句支持是最佳的。当然,您可以实现一个成熟的、jOOQ 风格的迷你 DSL 来支持 COPY
。 Here's the main idea behind jOOQ's fluent API design。但我认为这对于最常见的用例来说可能有点矫枉过正。
Leveraging plain SQL templating with some overloads might be enough. Looking at the Snowflake manual section about COPY
,这可能是一个最小的解决方案:
public static Query copy(
DSLContext ctx,
Table<?> into,
QueryPart from,
List<String> files
) {
return ctx.query("copy into {0} from {1} files = ({2})",
table,
from,
DSL.list(files.stream().map(DSL::inline).toList())
);
当然,您可以根据需要使它具有通用性和动态性,提供更多重载,或者再次提供一些构建器 API 甚至 DSL。另请查看 CustomQueryPart
and other custom syntax elements,了解何时字符串模板不够用。
我更进一步并实现了 DSL,具体语法为:COPY INTO <table/stage> FROM (SELECT FROM <table/stage>)...
这是我们的用例。
下面是 DSL 的主要代码,略有简化。您会看到对未显示其 impl 的各种自定义元素的引用,它们大多扩展 CustomQueryPart
.
请注意,它在 Scala 中 - 如果您不熟悉,请提前致歉。当我找到时间时,我将编辑并转换为 Java:
trait SnowflakeDSLContext extends DSLContext {
def copyInto[R <: Record, T1, T2](table: Table[R],
f1: Field[T1],
f2: Field[T2]) : CopyIntoFromStep2[R, T1, T2]
}
class SnowflakeDSLContextImpl(config: Configuration)
extends DefaultDSLContext(config) with SnowflakeDSLContext {
def copyInto[R <: Record, T1, T2](table: Table[R],
f1: Field[T1],
f2: Field[T2]): CopyIntoFromStep2[R, T1, T2] =
new CopyIntoImpl[R, T1, T2](this, table, Seq(f1, f2))
trait CopyIntoFromStep2[R <: Record, T1, T2] {
def from(select: Select[_ <: Record2[T1, T2]]): CopyIntoParametersStep
}
trait CopyIntoParametersStep extends CopyIntoFinalStep {
def fileFormat(formatType: FileFormatType,
maybeComprType: Option[CompressionType] = None): CopyIntoParametersStep
def purge(enabled: Boolean): CopyIntoParametersStep
}
trait CopyIntoFinalStep {
def fetchResults(): Seq[CopyIntoResult]
}
// This is the main class which "does the trick"
// by accumulating the query parts and building the final query
case class CopyIntoImpl[R <: Record, T1, T2](
dslContext: DSLContext,
table: Table[R],
fields: Seq[Field[_]],
maybeSelect: Option[Select[_ <: Record2[T1, T2]]] = None,
maybeFileFormat: Option[FileFormat] = None,
copyOptions: Seq[SingleValueParam[_]] = Nil
) extends CopyIntoFromStep2[R, T1, T2] with CopyIntoParametersStep {
override def from(select: Select[_ <: Record2[T1, T2]]): CopyIntoParametersStep = {
copy(maybeSelect = Some(select))
}
override def fileFormat(formatType: FileFormatType,
maybeCompr: Option[CompressionType] = None): CopyIntoParametersStep = {
copy(maybeFileFormat = Some(FileFormat(formatType, maybeCompr)))
}
override def purge(enabled: Boolean): CopyIntoParametersStep =
copy(copyOptions = copyOptions :+ Purge(enabled))
override def fetchResults(): Seq[CopyIntoResult] = {
dslContext.resultQuery("COPY INTO {0} {1} FROM ({2}) {3} {4}",
table,
UnqualifiedFieldList(fields),
maybeSelect.getOrElse(""),
maybeFileFormat.getOrElse(""),
LineDelimitedList(copyOptions))
.fetch(CopyIntoResultMapper)
.asScala
.toSeq
}
}
用法示例:
val results = dslContext.copyInto(targetTable, field1, field2)
.from(DSL.select(field1, field2)
.from(sourceTable))
.fileFormat(JSON, Some(GZIP))
.purge(true)
.fetchResults()