Apache Spark 数据集。 foreach 与 Aerospike 客户端
Apache Spark Dataset. foreach with Aerospike client
我想通过 Apache Spark 从 Apache Hive 检索行并将每一行放入 Aerospike 缓存。
这是一个简单的案例。
var dataset = session.sql("select * from employee");
final var aerospikeClient = aerospike; // to remove binding between lambda and the service class itself
dataset.foreach(row -> {
var key = new Key("namespace", "set", randomUUID().toString());
aerospikeClient.add(
key,
new Bin(
"json-repr",
row.json()
)
);
});
我收到一个错误:
Caused by: java.io.NotSerializableException: com.aerospike.client.reactor.AerospikeReactorClient
显然我无法使 AerospikeReactorClient
可序列化。我尝试添加 dataset.collectAsList()
并且确实有效。但据了解,此方法将所有内容加载到一个节点中。可能有大量数据。所以,这不是选项。
处理此类问题的最佳做法是什么?
你可以写directly from a data frame.不需要遍历数据集。
Launch the spark shell and import the com.aerospike.spark.sql._ package:
$ spark-shell
scala> import com.aerospike.spark.sql._
import com.aerospike.spark.sql._
Example of writing data into Aerospike
val TEST_COUNT= 100
val simpleSchema: StructType = new StructType(
Array(
StructField("one", IntegerType, nullable = false),
StructField("two", StringType, nullable = false),
StructField("three", DoubleType, nullable = false)
))
val simpleDF = {
val inputBuf= new ArrayBuffer[Row]()
for ( i <- 1 to num_records){
val one = i
val two = "two:"+i
val three = i.toDouble
val r = Row(one, two, three)
inputBuf.append(r)
}
val inputRDD = spark.sparkContext.parallelize(inputBuf.toSeq)
spark.createDataFrame(inputRDD,simpleSchema)
}
//Write the Sample Data to Aerospike
simpleDF.write
.format("aerospike") //aerospike specific format
.option("aerospike.writeset", "spark-test") //write to this set
.option("aerospike.updateByKey", "one")//indicates which columns should be used for construction of primary key
.option("aerospike.write.mode","update")
.save()
我通过在 foreach
lambda 中手动创建 AerospikeClient
设法解决了这个问题。
var dataset = session.sql("select * from employee");
dataset.foreach(row -> {
var key = new Key("namespace", "set", randomUUID().toString());
newAerospikeClient(aerospikeProperties).add(
key,
new Bin(
"json-repr",
row.json()
)
);
});
现在我只需要将 AerospikeProperties
声明为 Serializable
。
我想通过 Apache Spark 从 Apache Hive 检索行并将每一行放入 Aerospike 缓存。
这是一个简单的案例。
var dataset = session.sql("select * from employee");
final var aerospikeClient = aerospike; // to remove binding between lambda and the service class itself
dataset.foreach(row -> {
var key = new Key("namespace", "set", randomUUID().toString());
aerospikeClient.add(
key,
new Bin(
"json-repr",
row.json()
)
);
});
我收到一个错误:
Caused by: java.io.NotSerializableException: com.aerospike.client.reactor.AerospikeReactorClient
显然我无法使 AerospikeReactorClient
可序列化。我尝试添加 dataset.collectAsList()
并且确实有效。但据了解,此方法将所有内容加载到一个节点中。可能有大量数据。所以,这不是选项。
处理此类问题的最佳做法是什么?
你可以写directly from a data frame.不需要遍历数据集。
Launch the spark shell and import the com.aerospike.spark.sql._ package:
$ spark-shell scala> import com.aerospike.spark.sql._ import com.aerospike.spark.sql._
Example of writing data into Aerospike
val TEST_COUNT= 100 val simpleSchema: StructType = new StructType( Array( StructField("one", IntegerType, nullable = false), StructField("two", StringType, nullable = false), StructField("three", DoubleType, nullable = false) )) val simpleDF = { val inputBuf= new ArrayBuffer[Row]() for ( i <- 1 to num_records){ val one = i val two = "two:"+i val three = i.toDouble val r = Row(one, two, three) inputBuf.append(r) } val inputRDD = spark.sparkContext.parallelize(inputBuf.toSeq) spark.createDataFrame(inputRDD,simpleSchema) } //Write the Sample Data to Aerospike simpleDF.write .format("aerospike") //aerospike specific format .option("aerospike.writeset", "spark-test") //write to this set .option("aerospike.updateByKey", "one")//indicates which columns should be used for construction of primary key .option("aerospike.write.mode","update") .save()
我通过在 foreach
lambda 中手动创建 AerospikeClient
设法解决了这个问题。
var dataset = session.sql("select * from employee");
dataset.foreach(row -> {
var key = new Key("namespace", "set", randomUUID().toString());
newAerospikeClient(aerospikeProperties).add(
key,
new Bin(
"json-repr",
row.json()
)
);
});
现在我只需要将 AerospikeProperties
声明为 Serializable
。