MemSQL Spark 连接器将空值从 Spark 插入到 MemSQL
MemSQL Spark connector inserting nulls from Spark to MemSQL
我有这个程序正在读取 parquet 文件并将其写入 MemSQL table。我可以确认 Spark 正确读取文件
df.printSchema()
df.show(5)
正确打印架构和数据。
当我查询 table 时,我得到了所有行的 NULL 值。 table 中的所有内容均为 NULL。我不确定这里出了什么问题。
将parquet文件写入memsql的代码
package com.rb.scala
import com.memsql.spark.context.MemSQLContext
import java.sql.{ DriverManager, ResultSet, Connection, Timestamp }
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.expressions.RowOrdering
import com.memsql.spark.connector._
import com.memsql.spark.connector.OnDupKeyBehavior._
import com.memsql.spark.connector.dataframe._
import com.memsql.spark.connector.rdd._
import scala.util.control.NonFatal
import org.apache.log4j.Logger
object MemSQLWriter {
def main(arg: Array[String]) {
var logger = Logger.getLogger(this.getClass())
if (arg.length < 1) {
logger.error("=> wrong parameters number")
System.err.println("Usage: MainExample <directory containing the source files to be loaded to database > ")
System.exit(1)
}
val jobName = "MemSQLWriter"
val conf = new SparkConf().setAppName(jobName)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val pathToFiles = arg(0)
logger.info("=> jobName \"" + jobName + "\"")
logger.info("=> pathToFiles \"" + pathToFiles + "\"")
val dbHost = "xx.xx.xx.xx"
val dbPort = 3306
val dbName = "memsqlrdd_db"
val user = "root"
val password = ""
val tableName = "target_table"
val dbAddress = "jdbc:mysql://" + dbHost + ":" + dbPort
val df = sqlContext.read.parquet("/projects/example/data/")
val conn = DriverManager.getConnection(dbAddress, user, password)
val stmt = conn.createStatement
stmt.execute("CREATE DATABASE IF NOT EXISTS " + dbName)
stmt.execute("USE " + dbName)
stmt.execute("DROP TABLE IF EXISTS " + tableName)
df.printSchema()
df.show(5)
var columnArr = df.columns
var createQuery:String = " CREATE TABLE "+tableName+" ("
logger.info("=> no of columns : "+columnArr.length)
for(column <- columnArr){
createQuery += column
createQuery += " VARCHAR(100),"
}
createQuery += " SHARD KEY ("+columnArr(0)+"))"
logger.info("=> create table query "+createQuery)
stmt.execute(createQuery)
df.select().saveToMemSQL(dbName, tableName, dbHost, dbPort, user, password, upsertBatchSize = 1000, useKeylessShardedOptimization = true)
stmt.close()
}
}
您正在使用 SHARD 密钥创建 table,然后设置 useKeylessShardingOptimization = true
,这将产生未定义的行为。将其设置为 false,应该就可以了。
此外,我不确定 df.select().saveToMemSQL...
的作用。试试 df.saveToMemSQL ...
验证时,执行类似 SELECT * FROM table WHERE col IS NOT NULL LIMIT 10
的操作以查看您是否确实拥有所有空值。
PS:还有 df.createMemSQLTableAs
,它可以满足您的需求。
我有这个程序正在读取 parquet 文件并将其写入 MemSQL table。我可以确认 Spark 正确读取文件
df.printSchema()
df.show(5)
正确打印架构和数据。
当我查询 table 时,我得到了所有行的 NULL 值。 table 中的所有内容均为 NULL。我不确定这里出了什么问题。
将parquet文件写入memsql的代码
package com.rb.scala
import com.memsql.spark.context.MemSQLContext
import java.sql.{ DriverManager, ResultSet, Connection, Timestamp }
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.expressions.RowOrdering
import com.memsql.spark.connector._
import com.memsql.spark.connector.OnDupKeyBehavior._
import com.memsql.spark.connector.dataframe._
import com.memsql.spark.connector.rdd._
import scala.util.control.NonFatal
import org.apache.log4j.Logger
object MemSQLWriter {
def main(arg: Array[String]) {
var logger = Logger.getLogger(this.getClass())
if (arg.length < 1) {
logger.error("=> wrong parameters number")
System.err.println("Usage: MainExample <directory containing the source files to be loaded to database > ")
System.exit(1)
}
val jobName = "MemSQLWriter"
val conf = new SparkConf().setAppName(jobName)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val pathToFiles = arg(0)
logger.info("=> jobName \"" + jobName + "\"")
logger.info("=> pathToFiles \"" + pathToFiles + "\"")
val dbHost = "xx.xx.xx.xx"
val dbPort = 3306
val dbName = "memsqlrdd_db"
val user = "root"
val password = ""
val tableName = "target_table"
val dbAddress = "jdbc:mysql://" + dbHost + ":" + dbPort
val df = sqlContext.read.parquet("/projects/example/data/")
val conn = DriverManager.getConnection(dbAddress, user, password)
val stmt = conn.createStatement
stmt.execute("CREATE DATABASE IF NOT EXISTS " + dbName)
stmt.execute("USE " + dbName)
stmt.execute("DROP TABLE IF EXISTS " + tableName)
df.printSchema()
df.show(5)
var columnArr = df.columns
var createQuery:String = " CREATE TABLE "+tableName+" ("
logger.info("=> no of columns : "+columnArr.length)
for(column <- columnArr){
createQuery += column
createQuery += " VARCHAR(100),"
}
createQuery += " SHARD KEY ("+columnArr(0)+"))"
logger.info("=> create table query "+createQuery)
stmt.execute(createQuery)
df.select().saveToMemSQL(dbName, tableName, dbHost, dbPort, user, password, upsertBatchSize = 1000, useKeylessShardedOptimization = true)
stmt.close()
}
}
您正在使用 SHARD 密钥创建 table,然后设置 useKeylessShardingOptimization = true
,这将产生未定义的行为。将其设置为 false,应该就可以了。
此外,我不确定 df.select().saveToMemSQL...
的作用。试试 df.saveToMemSQL ...
验证时,执行类似 SELECT * FROM table WHERE col IS NOT NULL LIMIT 10
的操作以查看您是否确实拥有所有空值。
PS:还有 df.createMemSQLTableAs
,它可以满足您的需求。