使用 pyspark 将数据从 pyspark 数据帧插入到另一个 cassandra table
Insert data from pyspark dataframe to another cassandra table using pyspark
我有一个 cassandra table - test:
+----+---------+---------+
| id | country | counter |
+====+=========+=========+
| A | RU | 1 |
+----+---------+---------+
| B | EN | 2 |
+----+---------+---------+
| C | IQ | 1 |
+----+---------+---------+
| D | RU | 3 |
+----+---------+---------+
我在同一个 space 中还有一个 table main,其中列 "country_main" 和 "main_id"。
在 main_id 列中,我的 ID 与测试 table 中的 ID 相同,而且我还有一些唯一的 ID。 country_main 具有空值,与测试中的相同。例如:
+---------+--------------+---------+
| main_id | country_main | ...|
+=========+==============+=========+
| A | | ...|
+---------+--------------+---------+
| B | EN | ...|
+---------+--------------+---------+
| Y | IQ | ...|
+---------+--------------+---------+
| Z | RU | ...|
+---------+--------------+---------+
如何使用pyspark将测试table中的数据插入到main中,根据ids填充country_main中的空值?
具有以下架构和数据:
create table test.ct1 (
id text primary key,
country text,
cnt int);
insert into test.ct1(id, country, cnt) values('A', 'RU', 1);
insert into test.ct1(id, country, cnt) values('B', 'EN', 2);
insert into test.ct1(id, country, cnt) values('C', 'IQ', 1);
insert into test.ct1(id, country, cnt) values('D', 'RU', 3);
create table test.ct2 (
main_id text primary key,
country_main text,
cnt int);
insert into test.ct2(main_id, cnt) values('A', 1);
insert into test.ct2(main_id, country_main, cnt) values('B', 'EN', 2);
insert into test.ct2(main_id, country_main, cnt) values('C', 'IQ', 1);
insert into test.ct2(main_id, country_main, cnt) values('D', 'RU', 3);
应该是这样的:
from pyspark.sql.functions import *
ct1 = spark.read.format("org.apache.spark.sql.cassandra")\
.option("table", "ct1").option("keyspace", "test").load()
ct2 = spark.read.format("org.apache.spark.sql.cassandra")\
.option("table", "ct2").option("keyspace", "test").load()\
.where(col("country_main").isNull())
res = ct1.join(ct2, ct1.id == ct2.main_id).select(col("main_id"),
col("country").alias("country_main"))
res.write.format("org.apache.spark.sql.cassandra")\
.option("table", "ct2").option("keyspace", "test")\
.mode("append").save()
代码的作用:
- selects 来自
ct2
的所有行(对应于您的 main
table),其中 country_main
是 null
;
- 执行与
ct1
的连接(对应于您的 test
table)以从中获取国家/地区的价值(优化可能是 select 两者中唯一必要的列tables)。另外,请注意,联接是由 Spark 完成的,而不是在 Cassandra 级别上完成的 - Cassandra 级别的联接将仅在即将推出的 Spark Cassandra Connector 版本(3.0,但已发布 alpha 版本)中受支持;
- 重命名列以匹配
ct2
table; 的结构
- 写回数据。
结果:
cqlsh> select * from test.ct2;
main_id | cnt | country_main
---------+-----+--------------
C | 1 | IQ
B | 2 | EN
A | 1 | RU
D | 3 | RU
对于源数据:
cqlsh> select * from test.ct2;
main_id | cnt | country_main
---------+-----+--------------
C | 1 | IQ
B | 2 | EN
A | 1 | null
D | 3 | RU
我有一个 cassandra table - test:
+----+---------+---------+
| id | country | counter |
+====+=========+=========+
| A | RU | 1 |
+----+---------+---------+
| B | EN | 2 |
+----+---------+---------+
| C | IQ | 1 |
+----+---------+---------+
| D | RU | 3 |
+----+---------+---------+
我在同一个 space 中还有一个 table main,其中列 "country_main" 和 "main_id"。 在 main_id 列中,我的 ID 与测试 table 中的 ID 相同,而且我还有一些唯一的 ID。 country_main 具有空值,与测试中的相同。例如:
+---------+--------------+---------+
| main_id | country_main | ...|
+=========+==============+=========+
| A | | ...|
+---------+--------------+---------+
| B | EN | ...|
+---------+--------------+---------+
| Y | IQ | ...|
+---------+--------------+---------+
| Z | RU | ...|
+---------+--------------+---------+
如何使用pyspark将测试table中的数据插入到main中,根据ids填充country_main中的空值?
具有以下架构和数据:
create table test.ct1 (
id text primary key,
country text,
cnt int);
insert into test.ct1(id, country, cnt) values('A', 'RU', 1);
insert into test.ct1(id, country, cnt) values('B', 'EN', 2);
insert into test.ct1(id, country, cnt) values('C', 'IQ', 1);
insert into test.ct1(id, country, cnt) values('D', 'RU', 3);
create table test.ct2 (
main_id text primary key,
country_main text,
cnt int);
insert into test.ct2(main_id, cnt) values('A', 1);
insert into test.ct2(main_id, country_main, cnt) values('B', 'EN', 2);
insert into test.ct2(main_id, country_main, cnt) values('C', 'IQ', 1);
insert into test.ct2(main_id, country_main, cnt) values('D', 'RU', 3);
应该是这样的:
from pyspark.sql.functions import *
ct1 = spark.read.format("org.apache.spark.sql.cassandra")\
.option("table", "ct1").option("keyspace", "test").load()
ct2 = spark.read.format("org.apache.spark.sql.cassandra")\
.option("table", "ct2").option("keyspace", "test").load()\
.where(col("country_main").isNull())
res = ct1.join(ct2, ct1.id == ct2.main_id).select(col("main_id"),
col("country").alias("country_main"))
res.write.format("org.apache.spark.sql.cassandra")\
.option("table", "ct2").option("keyspace", "test")\
.mode("append").save()
代码的作用:
- selects 来自
ct2
的所有行(对应于您的main
table),其中country_main
是null
; - 执行与
ct1
的连接(对应于您的test
table)以从中获取国家/地区的价值(优化可能是 select 两者中唯一必要的列tables)。另外,请注意,联接是由 Spark 完成的,而不是在 Cassandra 级别上完成的 - Cassandra 级别的联接将仅在即将推出的 Spark Cassandra Connector 版本(3.0,但已发布 alpha 版本)中受支持; - 重命名列以匹配
ct2
table; 的结构
- 写回数据。
结果:
cqlsh> select * from test.ct2;
main_id | cnt | country_main
---------+-----+--------------
C | 1 | IQ
B | 2 | EN
A | 1 | RU
D | 3 | RU
对于源数据:
cqlsh> select * from test.ct2;
main_id | cnt | country_main
---------+-----+--------------
C | 1 | IQ
B | 2 | EN
A | 1 | null
D | 3 | RU