Jooq雪花复制命令供应商特定功能

Jooq snowflake copy command vendor specific function

有没有人使用sql 模板或其他方法在 JOOQ 中实现雪花的 COPY 命令?我在那里找不到任何东西。我在 JOOQ 的 github 页面上看到一个关于它的问题,但目前似乎没有支持 COPY 命令的计划,因为它是供应商特定的功能。 如果那里什么都没有,也许有一些关于支持它的最佳方式的提示。所有不同的选项都有很多围绕它的语法。

我认为一些中间立场对您在这里实现动态 COPY 语句支持是最佳的。当然,您可以实现一个成熟的、jOOQ 风格的迷你 DSL 来支持 COPYHere's the main idea behind jOOQ's fluent API design。但我认为这对于最常见的用例来说可能有点矫枉过正。

Leveraging plain SQL templating with some overloads might be enough. Looking at the Snowflake manual section about COPY,这可能是一个最小的解决方案:

public static Query copy(
    DSLContext ctx,
    Table<?> into,
    QueryPart from,
    List<String> files
) {
    return ctx.query("copy into {0} from {1} files = ({2})", 
        table, 
        from,
        DSL.list(files.stream().map(DSL::inline).toList())
    );

当然,您可以根据需要使它具有通用性和动态性,提供更多重载,或者再次提供一些构建器 API 甚至 DSL。另请查看 CustomQueryPart and other custom syntax elements,了解何时字符串模板不够用。

我更进一步并实现了 DSL,具体语法为:COPY INTO <table/stage> FROM (SELECT FROM <table/stage>)... 这是我们的用例。
下面是 DSL 的主要代码,略有简化。您会看到对未显示其 impl 的各种自定义元素的引用,它们大多扩展 CustomQueryPart.

请注意,它在 Scala 中 - 如果您不熟悉,请提前致歉。当我找到时间时,我将编辑并转换为 Java:

trait SnowflakeDSLContext extends DSLContext {
    def copyInto[R <: Record, T1, T2](table: Table[R],
                                      f1: Field[T1],
                                      f2: Field[T2]) : CopyIntoFromStep2[R, T1, T2]
}


class SnowflakeDSLContextImpl(config: Configuration)
    extends DefaultDSLContext(config) with SnowflakeDSLContext {

    def copyInto[R <: Record, T1, T2](table: Table[R], 
                                      f1: Field[T1],
                                      f2: Field[T2]): CopyIntoFromStep2[R, T1, T2] =
        new CopyIntoImpl[R, T1, T2](this, table, Seq(f1, f2))
 
   
trait CopyIntoFromStep2[R <: Record, T1, T2] {
    def from(select: Select[_ <: Record2[T1, T2]]): CopyIntoParametersStep
}


trait CopyIntoParametersStep extends CopyIntoFinalStep {
    def fileFormat(formatType: FileFormatType,
                   maybeComprType: Option[CompressionType] = None): CopyIntoParametersStep

    def purge(enabled: Boolean): CopyIntoParametersStep
}


trait CopyIntoFinalStep {
    def fetchResults(): Seq[CopyIntoResult]
}

// This is the main class which "does the trick" 
// by accumulating the query parts and building the final query
case class CopyIntoImpl[R <: Record, T1, T2](
    dslContext: DSLContext,
    table: Table[R],
    fields: Seq[Field[_]],
    maybeSelect: Option[Select[_ <: Record2[T1, T2]]] = None,
    maybeFileFormat: Option[FileFormat] = None,
    copyOptions: Seq[SingleValueParam[_]] = Nil
) extends CopyIntoFromStep2[R, T1, T2] with CopyIntoParametersStep {

    override def from(select: Select[_ <: Record2[T1, T2]]): CopyIntoParametersStep = {
        copy(maybeSelect = Some(select))
    }

    override def fileFormat(formatType: FileFormatType,
                            maybeCompr: Option[CompressionType] = None): CopyIntoParametersStep = {
        copy(maybeFileFormat = Some(FileFormat(formatType, maybeCompr)))
    }

    override def purge(enabled: Boolean): CopyIntoParametersStep =
        copy(copyOptions = copyOptions :+ Purge(enabled))

    override def fetchResults(): Seq[CopyIntoResult] = {
        dslContext.resultQuery("COPY INTO {0} {1} FROM ({2}) {3} {4}",
                               table,
                               UnqualifiedFieldList(fields),
                               maybeSelect.getOrElse(""),
                               maybeFileFormat.getOrElse(""),
                               LineDelimitedList(copyOptions))
                  .fetch(CopyIntoResultMapper)
                  .asScala
                  .toSeq
    }
}

用法示例:

val results = dslContext.copyInto(targetTable, field1, field2)
                        .from(DSL.select(field1, field2)
                                 .from(sourceTable))
                        .fileFormat(JSON, Some(GZIP))
                        .purge(true)
                        .fetchResults()