Spark-Cassandra,如何基于Query获取数据
Spark-Cassandra , How to get data based on Query
我有一个 Cassandra table,它非常大,现在我有 spark-Cassandra 与以下代码的连接。
import pandas as pd
import numpy as np
from pyspark import *
import os
from pyspark.sql import SQLContext
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.1 --conf spark.cassandra.connection.host=127.0.0.1 pyspark-shell'
conf = SparkConf().set("spark.cassandra.connection.host", "127.0.0.1").set("spark.cassandra.connection.port", "9042").setAppName("Sentinel").setMaster("spark://Local:7077")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
table_df = sqlContext.read\
.format("org.apache.spark.sql.cassandra")\
.options(table='movies', keyspace='movie_lens')\
.load()\
主键是Movie_id,是一个整数。
.load() 将整个 table 加载到内存中,这是我想避免的。我得到的一种方法是使用 filter
table_df = sqlContext.read\
.format("org.apache.spark.sql.cassandra")\
.options(table='movies', keyspace='movie_lens')\
.load()\
.filter("movie_id = 37032")
但是过滤器真的会阻止将整个 table 加载到内存中吗?还是先加载再过滤。
另外,我必须查询许多 ID。假设我需要 1000 个 ID,并且 ID 每天都在变化。那怎么办呢?
是的,如果您在分区键上进行查询,Spark Cassandra Connector 将执行所谓的“谓词下推”,并且只会从特定查询加载数据(.load
函数将只加载元数据,实际数据加载将在您真正需要数据来执行操作时第一次发生)。关于何时在 Spark Cassandra 连接器中发生谓词下推有 well documented 条规则。您还可以通过 运行 table_df.explain()
进行检查,并查找 PushedFilters
部分以查找标有星号 *
.
的过滤器
如果您需要查找多个 ID,那么您可以使用 .isin
过滤器,但不推荐使用 Cassandra。最好创建一个带有 ID 的数据框,并在 Cassandra
中执行所谓的 Direct Join with Cassandra dataframe (it's available since SCC 2.5 for dataframes, or earlier for RDDs). I have a lengthy blog post 与数据的连接
我有一个 Cassandra table,它非常大,现在我有 spark-Cassandra 与以下代码的连接。
import pandas as pd
import numpy as np
from pyspark import *
import os
from pyspark.sql import SQLContext
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.1 --conf spark.cassandra.connection.host=127.0.0.1 pyspark-shell'
conf = SparkConf().set("spark.cassandra.connection.host", "127.0.0.1").set("spark.cassandra.connection.port", "9042").setAppName("Sentinel").setMaster("spark://Local:7077")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
table_df = sqlContext.read\
.format("org.apache.spark.sql.cassandra")\
.options(table='movies', keyspace='movie_lens')\
.load()\
主键是Movie_id,是一个整数。 .load() 将整个 table 加载到内存中,这是我想避免的。我得到的一种方法是使用 filter
table_df = sqlContext.read\
.format("org.apache.spark.sql.cassandra")\
.options(table='movies', keyspace='movie_lens')\
.load()\
.filter("movie_id = 37032")
但是过滤器真的会阻止将整个 table 加载到内存中吗?还是先加载再过滤。 另外,我必须查询许多 ID。假设我需要 1000 个 ID,并且 ID 每天都在变化。那怎么办呢?
是的,如果您在分区键上进行查询,Spark Cassandra Connector 将执行所谓的“谓词下推”,并且只会从特定查询加载数据(.load
函数将只加载元数据,实际数据加载将在您真正需要数据来执行操作时第一次发生)。关于何时在 Spark Cassandra 连接器中发生谓词下推有 well documented 条规则。您还可以通过 运行 table_df.explain()
进行检查,并查找 PushedFilters
部分以查找标有星号 *
.
如果您需要查找多个 ID,那么您可以使用 .isin
过滤器,但不推荐使用 Cassandra。最好创建一个带有 ID 的数据框,并在 Cassandra