MemSQL Spark 连接器将空值从 Spark 插入到 MemSQL

MemSQL Spark connector inserting nulls from Spark to MemSQL

我有这个程序正在读取 parquet 文件并将其写入 MemSQL table。我可以确认 Spark 正确读取文件

df.printSchema()
df.show(5)

正确打印架构和数据。

当我查询 table 时,我得到了所有行的 NULL 值。 table 中的所有内容均为 NULL。我不确定这里出了什么问题。

将parquet文件写入memsql的代码

package com.rb.scala

    import com.memsql.spark.context.MemSQLContext
    import java.sql.{ DriverManager, ResultSet, Connection, Timestamp }

    import org.apache.spark._
    import org.apache.spark.sql._
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.catalyst.expressions.RowOrdering

    import com.memsql.spark.connector._
    import com.memsql.spark.connector.OnDupKeyBehavior._
    import com.memsql.spark.connector.dataframe._
    import com.memsql.spark.connector.rdd._

    import scala.util.control.NonFatal
    import org.apache.log4j.Logger
    object MemSQLWriter {

    def main(arg: Array[String]) {

    var logger = Logger.getLogger(this.getClass())

    if (arg.length < 1) {
      logger.error("=> wrong parameters number")
      System.err.println("Usage: MainExample <directory containing the source files to be loaded to database > ")
      System.exit(1)
    }

    val jobName = "MemSQLWriter"
    val conf = new SparkConf().setAppName(jobName)
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val pathToFiles = arg(0)
    logger.info("=> jobName \"" + jobName + "\"")
    logger.info("=> pathToFiles \"" + pathToFiles + "\"")
    val dbHost = "xx.xx.xx.xx"
    val dbPort = 3306
    val dbName = "memsqlrdd_db"
    val user = "root"
    val password = ""
    val tableName = "target_table"
    val dbAddress = "jdbc:mysql://" + dbHost + ":" + dbPort
    val df = sqlContext.read.parquet("/projects/example/data/")
    val conn = DriverManager.getConnection(dbAddress, user, password)
    val stmt = conn.createStatement
    stmt.execute("CREATE DATABASE IF NOT EXISTS " + dbName)
    stmt.execute("USE " + dbName)
    stmt.execute("DROP TABLE IF EXISTS " + tableName)
    df.printSchema()
    df.show(5)
    var columnArr  = df.columns
    var createQuery:String = " CREATE TABLE "+tableName+" ("
    logger.info("=> no of columns : "+columnArr.length)
    for(column <- columnArr){
       createQuery += column
       createQuery += " VARCHAR(100),"
    }
    createQuery += " SHARD KEY ("+columnArr(0)+"))"
    logger.info("=> create table query "+createQuery)
    stmt.execute(createQuery)

    df.select().saveToMemSQL(dbName, tableName, dbHost, dbPort, user, password, upsertBatchSize = 1000, useKeylessShardedOptimization = true)
    stmt.close()
  }
}

您正在使用 SHARD 密钥创建 table,然后设置 useKeylessShardingOptimization = true,这将产生未定义的行为。将其设置为 false,应该就可以了。

此外,我不确定 df.select().saveToMemSQL... 的作用。试试 df.saveToMemSQL ...

验证时,执行类似 SELECT * FROM table WHERE col IS NOT NULL LIMIT 10 的操作以查看您是否确实拥有所有空值。

PS:还有 df.createMemSQLTableAs,它可以满足您的需求。