为不同的查询创建新的 SparkSession?
create new SparkSession for different query?
我想从elasticsearch获取两条数据
一个用查询过滤,另一个没有过滤。
// with query
session = get_spark_session(query=query)
df = session.read.option(
"es.resource", "analytics-prod-2019.08.02"
).format("org.elasticsearch.spark.sql").load()
df.show() // empty result
// without query
session = get_spark_session()
df = session.read.option(
"es.resource", "analytics-prod-2019.08.02"
).format("org.elasticsearch.spark.sql").load()
df.show() // empty result
def get_spark_session(query=None, excludes=[]):
conf = pyspark.SparkConf()
conf.set("spark.driver.allowMultipleContexts", "true")
conf.set("es.index.auto.create", "true")
conf.set("es.nodes.discovery", "true")
conf.set("es.scroll.size", 10000)
conf.set("es.read.field.exclude", excludes)
conf.set("spark.driver.extraClassPath", "/usr/local/elasticsearch-hadoop/dist/elasticsearch-spark-20_2.11-6.6.2.jar")
if query:
conf.set("es.query", query)
sc = SparkSession.builder.config(conf=conf).getOrCreate()
return sc
问题是session是否被重用..
当我先运行 filtered
查询,non-filtered
再查询时,
两者都给出空结果
但是我先运行non-filtered
查询时,显示了一些结果,后面的filtered
查询显示空结果。
// below, I reverse the order
// without query
session = get_spark_session()
df = session.read.option(
"es.resource", "analytics-prod-2019.08.02"
).format("org.elasticsearch.spark.sql").load()
df.show() // some result
// with query
session = get_spark_session(query=query)
df = session.read.option(
"es.resource", "analytics-prod-2019.08.02"
).format("org.elasticsearch.spark.sql").load()
df.show() // empty result
** 编辑
所以我可以通过以下方式获得想要的结果:
def get_spark_session(query=None, excludes=[]):
conf = pyspark.SparkConf()
conf.set("spark.driver.allowMultipleContexts", "true")
conf.set("es.index.auto.create", "true")
conf.set("es.nodes.discovery", "true")
conf.set("es.scroll.size", 10000)
conf.set("es.read.field.exclude", excludes)
conf.set("spark.driver.extraClassPath", "/usr/local/elasticsearch-hadoop/dist/elasticsearch-spark-20_2.11-6.6.2.jar")
if query:
conf.set("es.query", query)
else:
conf.set("es.query", "") # unset the query
SparkSession.builder 获取现有的 SparkSession,或者如果不存在,则根据此构建器中设置的选项创建一个新的。在您的情况下,spark 配置被重用。从配置中删除 "es.query" 应该可以解决这个问题:
def get_spark_session(query=None, excludes=[]):
conf = pyspark.SparkConf()
conf.unset("es.query")
conf.set("spark.driver.allowMultipleContexts", "true")
conf.set("es.index.auto.create", "true")
conf.set("es.nodes.discovery", "true")
conf.set("es.scroll.size", 10000)
conf.set("es.read.field.exclude", excludes)
conf.set("spark.driver.extraClassPath", "/usr/local/elasticsearch-hadoop/dist/elasticsearch-spark-20_2.11-6.6.2.jar")
if query:
conf.set("es.query", query)
sc = SparkSession.builder.config(conf=conf).getOrCreate()
return sc
我想从elasticsearch获取两条数据
一个用查询过滤,另一个没有过滤。
// with query
session = get_spark_session(query=query)
df = session.read.option(
"es.resource", "analytics-prod-2019.08.02"
).format("org.elasticsearch.spark.sql").load()
df.show() // empty result
// without query
session = get_spark_session()
df = session.read.option(
"es.resource", "analytics-prod-2019.08.02"
).format("org.elasticsearch.spark.sql").load()
df.show() // empty result
def get_spark_session(query=None, excludes=[]):
conf = pyspark.SparkConf()
conf.set("spark.driver.allowMultipleContexts", "true")
conf.set("es.index.auto.create", "true")
conf.set("es.nodes.discovery", "true")
conf.set("es.scroll.size", 10000)
conf.set("es.read.field.exclude", excludes)
conf.set("spark.driver.extraClassPath", "/usr/local/elasticsearch-hadoop/dist/elasticsearch-spark-20_2.11-6.6.2.jar")
if query:
conf.set("es.query", query)
sc = SparkSession.builder.config(conf=conf).getOrCreate()
return sc
问题是session是否被重用..
当我先运行 filtered
查询,non-filtered
再查询时,
两者都给出空结果
但是我先运行non-filtered
查询时,显示了一些结果,后面的filtered
查询显示空结果。
// below, I reverse the order
// without query
session = get_spark_session()
df = session.read.option(
"es.resource", "analytics-prod-2019.08.02"
).format("org.elasticsearch.spark.sql").load()
df.show() // some result
// with query
session = get_spark_session(query=query)
df = session.read.option(
"es.resource", "analytics-prod-2019.08.02"
).format("org.elasticsearch.spark.sql").load()
df.show() // empty result
** 编辑
所以我可以通过以下方式获得想要的结果:
def get_spark_session(query=None, excludes=[]):
conf = pyspark.SparkConf()
conf.set("spark.driver.allowMultipleContexts", "true")
conf.set("es.index.auto.create", "true")
conf.set("es.nodes.discovery", "true")
conf.set("es.scroll.size", 10000)
conf.set("es.read.field.exclude", excludes)
conf.set("spark.driver.extraClassPath", "/usr/local/elasticsearch-hadoop/dist/elasticsearch-spark-20_2.11-6.6.2.jar")
if query:
conf.set("es.query", query)
else:
conf.set("es.query", "") # unset the query
SparkSession.builder 获取现有的 SparkSession,或者如果不存在,则根据此构建器中设置的选项创建一个新的。在您的情况下,spark 配置被重用。从配置中删除 "es.query" 应该可以解决这个问题:
def get_spark_session(query=None, excludes=[]):
conf = pyspark.SparkConf()
conf.unset("es.query")
conf.set("spark.driver.allowMultipleContexts", "true")
conf.set("es.index.auto.create", "true")
conf.set("es.nodes.discovery", "true")
conf.set("es.scroll.size", 10000)
conf.set("es.read.field.exclude", excludes)
conf.set("spark.driver.extraClassPath", "/usr/local/elasticsearch-hadoop/dist/elasticsearch-spark-20_2.11-6.6.2.jar")
if query:
conf.set("es.query", query)
sc = SparkSession.builder.config(conf=conf).getOrCreate()
return sc