Spark Mongo 连接器:在 Mongo 数据库连接中仅更新一个属性
Spark Mongo connector: Upsert only one attribute in MongoDB connection
假设我有以下 Mongo 文档:
{
"_id":1,
"age": 10,
"foo": 20
}
和以下 Spark DataFrame df
:
_id | val
1 | 'a'
2 | 'b'
现在我想将数据框中的 val
附加到 Mongo 文档中...
使用MongoDB Spark connector,我可以通过“_id”附加使用默认更新逻辑,这意味着如果Spark数据帧中的“_id”和Mongo 文档匹配,Mongo 连接器不会创建新文档,而是更新旧文档。
但是!更新基本上表现得像 replace - 如果我执行以下操作:
df
.write.format("com.mongodb.spark.sql.DefaultSource")
.mode("append")
.option('spark.mongodb.output.uri','mongodb://mongo_server:27017/testdb.test_collection')
.save()
集合将如下所示:
[
{
"_id":1,
"val": 'a'
},
{
"_id":2,
"val':'b'
}
]
我想得到这个:
[
{
"_id":1,
"age": 10,
"foo": 20
"val": 'a'
},
{
"_id":2,
"val':'b'
}
]
我的问题是:
是否有办法(某些选项)使 Spark 连接器按此方式运行
我想要它的行为?
可以,我可以先把Mongo的文档读到Spark,充实一下
它们具有 "val" 属性,并且 write/append 返回 Mongo。这个操作的I/O是什么?是否满载(读取所有文档,然后
替换所有属性)还是有点聪明(比如阅读所有
文档,但仅附加 "val" 属性,而不是
替换整个文档)?
is there way (some option) to make the Spark connector behave the way I want it to behave?
是的,您可以将 replaceDocument
设置为 false
。例如,在 Python 中使用 MongoDB connector for Spark v2.2.2 和 Apache Spark v2.3:
df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", "mongodb://host101:27017/dbName.collName").load()
df.first()
> Row(_id=1.0, age=10.0, foo=20.0)
# Create a dataframe
d = {'_id': [1, 2], 'val':['a', 'b']}
inputdf = pd.DataFrame(data=d)
sparkdf = sqlContext.createDataFrame(inputdf)
# Write to Spark
sparkdf.write.format("com.mongodb.spark.sql.DefaultSource")
.mode("append").option("spark.mongodb.output.uri", "mongodb://host101:27017/dbName.collName")
.option("replaceDocument", "false")
.save()
# Result
+---+----+----+---+
|_id| age| foo|val|
+---+----+----+---+
|1.0|10.0|20.0| a|
|2.0|null|null| b|
+---+----+----+---+
假设我有以下 Mongo 文档:
{
"_id":1,
"age": 10,
"foo": 20
}
和以下 Spark DataFrame df
:
_id | val
1 | 'a'
2 | 'b'
现在我想将数据框中的 val
附加到 Mongo 文档中...
使用MongoDB Spark connector,我可以通过“_id”附加使用默认更新逻辑,这意味着如果Spark数据帧中的“_id”和Mongo 文档匹配,Mongo 连接器不会创建新文档,而是更新旧文档。
但是!更新基本上表现得像 replace - 如果我执行以下操作:
df
.write.format("com.mongodb.spark.sql.DefaultSource")
.mode("append")
.option('spark.mongodb.output.uri','mongodb://mongo_server:27017/testdb.test_collection')
.save()
集合将如下所示:
[
{
"_id":1,
"val": 'a'
},
{
"_id":2,
"val':'b'
}
]
我想得到这个:
[
{
"_id":1,
"age": 10,
"foo": 20
"val": 'a'
},
{
"_id":2,
"val':'b'
}
]
我的问题是:
是否有办法(某些选项)使 Spark 连接器按此方式运行 我想要它的行为?
可以,我可以先把Mongo的文档读到Spark,充实一下 它们具有 "val" 属性,并且 write/append 返回 Mongo。这个操作的I/O是什么?是否满载(读取所有文档,然后 替换所有属性)还是有点聪明(比如阅读所有 文档,但仅附加 "val" 属性,而不是 替换整个文档)?
is there way (some option) to make the Spark connector behave the way I want it to behave?
是的,您可以将 replaceDocument
设置为 false
。例如,在 Python 中使用 MongoDB connector for Spark v2.2.2 和 Apache Spark v2.3:
df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", "mongodb://host101:27017/dbName.collName").load()
df.first()
> Row(_id=1.0, age=10.0, foo=20.0)
# Create a dataframe
d = {'_id': [1, 2], 'val':['a', 'b']}
inputdf = pd.DataFrame(data=d)
sparkdf = sqlContext.createDataFrame(inputdf)
# Write to Spark
sparkdf.write.format("com.mongodb.spark.sql.DefaultSource")
.mode("append").option("spark.mongodb.output.uri", "mongodb://host101:27017/dbName.collName")
.option("replaceDocument", "false")
.save()
# Result
+---+----+----+---+
|_id| age| foo|val|
+---+----+----+---+
|1.0|10.0|20.0| a|
|2.0|null|null| b|
+---+----+----+---+