Spark Mongo 连接器:在 Mongo 数据库连接中仅更新一个属性

Spark Mongo connector: Upsert only one attribute in MongoDB connection

假设我有以下 Mongo 文档:

{
 "_id":1, 
 "age": 10,
 "foo": 20
}

和以下 Spark DataFrame df

_id | val
 1  | 'a'
 2  | 'b'

现在我想将数据框中的 val 附加到 Mongo 文档中...

使用MongoDB Spark connector,我可以通过“_id”附加使用默认更新逻辑,这意味着如果Spark数据帧中的“_id”和Mongo 文档匹配,Mongo 连接器不会创建新文档,而是更新旧文档。

但是!更新基本上表现得像 replace - 如果我执行以下操作:

df
.write.format("com.mongodb.spark.sql.DefaultSource")
.mode("append")
.option('spark.mongodb.output.uri','mongodb://mongo_server:27017/testdb.test_collection')
.save()

集合将如下所示:

[   
    {
     "_id":1, 
     "val": 'a'
    },
   {
     "_id":2, 
     "val':'b' 
    }
]

我想得到这个:

[   
    {
     "_id":1, 
     "age": 10,
     "foo": 20
     "val": 'a'
    },
   {
     "_id":2, 
     "val':'b' 
    }
]

我的问题是

is there way (some option) to make the Spark connector behave the way I want it to behave?

是的,您可以将 replaceDocument 设置为 false。例如,在 Python 中使用 MongoDB connector for Spark v2.2.2 和 Apache Spark v2.3:

df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")
               .option("spark.mongodb.input.uri", "mongodb://host101:27017/dbName.collName").load()
df.first() 
> Row(_id=1.0, age=10.0, foo=20.0)

# Create a dataframe 
d = {'_id': [1, 2], 'val':['a', 'b']}
inputdf = pd.DataFrame(data=d) 
sparkdf = sqlContext.createDataFrame(inputdf)

# Write to Spark 
sparkdf.write.format("com.mongodb.spark.sql.DefaultSource")
             .mode("append").option("spark.mongodb.output.uri", "mongodb://host101:27017/dbName.collName")
             .option("replaceDocument", "false")
             .save()

# Result 
+---+----+----+---+
|_id| age| foo|val|
+---+----+----+---+
|1.0|10.0|20.0|  a|
|2.0|null|null|  b|
+---+----+----+---+