从 spark-shell (pyspark) 查询 spark 流应用程序
Querying a spark streaming application from spark-shell (pyspark)
我在 pyspark
控制台中关注此 example,一切正常。
之后我将其编写为 PySpark 应用程序如下:
# -*- coding: utf-8 -*-
import sys
import click
import logging
from pyspark.sql import SparkSession
from pyspark.sql.types import *
@click.command()
@click.option('--master')
def most_idiotic_bi_query(master):
spark = SparkSession \
.builder \
.master(master)\
.appName("stream-test")\
.getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
some_schema = .... # Schema removed
some_stream = spark\
.readStream\
.option("sep", ",")\
.schema(some_schema)\
.option("maxFilesPerTrigger", 1)\
.csv("/data/some_stream", header=True)
streaming_counts = (
linkage_stream.groupBy(some_stream.field_1).count()
)
query = streaming_counts.writeStream\
.format("memory")\
.queryName("counts")\
.outputMode("complete")\
.start()
query.awaitTermination()
if __name__ == "__main__":
logging.getLogger("py4j").setLevel(logging.ERROR)
most_idiotic_bi_query()
应用执行为:
spark-submit test_stream.py --master spark://master:7077
现在,如果我在另一个终端打开一个新的 spark 驱动程序:
pyspark --master spark://master:7077
并尝试 运行:
spark.sql("select * from counts")
它失败了:
During handling of the above exception, another exception occurred:
AnalysisExceptionTraceback (most recent call last)
<ipython-input-3-732b22f02ef6> in <module>()
----> 1 spark.sql("select * from id_counts").show()
/usr/spark-2.0.2/python/pyspark/sql/session.py in sql(self, sqlQuery)
541 [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
542 """
--> 543 return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
544
545 @since(2.0)
/usr/local/lib/python3.4/dist-packages/py4j-0.10.4-py3.4.egg/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/usr/spark-2.0.2/python/pyspark/sql/utils.py in deco(*a, **kw)
67 e.java_exception.getStackTrace()))
68 if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
70 if s.startswith('org.apache.spark.sql.catalyst.analysis'):
71 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: 'Table or view not found: counts; line 1 pos 14'
我不明白发生了什么。
这是预期的行为。如果您检查 the documentation 内存槽:
The output is stored in memory as an in-memory table. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver’s memory. Hence, use it with caution.
如您所见,内存接收器不会创建持久性 table 或全局临时视图,而是仅限于驱动程序的本地结构。因此无法从另一个 Spark 应用程序查询它。
因此必须从驱动程序查询内存输出,并在其中写入。例如,您可以模仿 console
模式,如下所示。
虚拟作家:
import pandas as pd
import numpy as np
import tempfile
import shutil
def producer(path):
temp_path = tempfile.mkdtemp()
def producer(i):
df = pd.DataFrame({
"group": np.random.randint(10, size=1000)
})
df["val"] = (
np.random.randn(1000) +
np.random.random(1000) * df["group"] +
np.random.random(1000) * i % 7
)
f = tempfile.mktemp(dir=temp_path)
df.to_csv(f, index=False)
shutil.move(f, path)
return producer
Spark 应用程序:
from pyspark.sql.types import IntegerType, DoubleType, StructType, StructField
schema = StructType([
StructField("group", IntegerType()),
StructField("val", DoubleType())
])
path = tempfile.mkdtemp()
query_name = "foo"
stream = (spark.readStream
.schema(schema)
.format("csv")
.option("header", "true")
.load(path))
query = (stream
.groupBy("group")
.avg("val")
.writeStream
.format("memory")
.queryName(query_name)
.outputMode("complete")
.start())
还有一些事件:
from rx import Observable
timer = Observable.timer(5000, 5000)
timer.subscribe(producer(path))
timer.skip(1).subscribe(lambda *_: spark.table(query_name).show())
query.awaitTermination()
我在 pyspark
控制台中关注此 example,一切正常。
之后我将其编写为 PySpark 应用程序如下:
# -*- coding: utf-8 -*-
import sys
import click
import logging
from pyspark.sql import SparkSession
from pyspark.sql.types import *
@click.command()
@click.option('--master')
def most_idiotic_bi_query(master):
spark = SparkSession \
.builder \
.master(master)\
.appName("stream-test")\
.getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
some_schema = .... # Schema removed
some_stream = spark\
.readStream\
.option("sep", ",")\
.schema(some_schema)\
.option("maxFilesPerTrigger", 1)\
.csv("/data/some_stream", header=True)
streaming_counts = (
linkage_stream.groupBy(some_stream.field_1).count()
)
query = streaming_counts.writeStream\
.format("memory")\
.queryName("counts")\
.outputMode("complete")\
.start()
query.awaitTermination()
if __name__ == "__main__":
logging.getLogger("py4j").setLevel(logging.ERROR)
most_idiotic_bi_query()
应用执行为:
spark-submit test_stream.py --master spark://master:7077
现在,如果我在另一个终端打开一个新的 spark 驱动程序:
pyspark --master spark://master:7077
并尝试 运行:
spark.sql("select * from counts")
它失败了:
During handling of the above exception, another exception occurred:
AnalysisExceptionTraceback (most recent call last)
<ipython-input-3-732b22f02ef6> in <module>()
----> 1 spark.sql("select * from id_counts").show()
/usr/spark-2.0.2/python/pyspark/sql/session.py in sql(self, sqlQuery)
541 [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
542 """
--> 543 return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
544
545 @since(2.0)
/usr/local/lib/python3.4/dist-packages/py4j-0.10.4-py3.4.egg/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/usr/spark-2.0.2/python/pyspark/sql/utils.py in deco(*a, **kw)
67 e.java_exception.getStackTrace()))
68 if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
70 if s.startswith('org.apache.spark.sql.catalyst.analysis'):
71 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: 'Table or view not found: counts; line 1 pos 14'
我不明白发生了什么。
这是预期的行为。如果您检查 the documentation 内存槽:
The output is stored in memory as an in-memory table. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver’s memory. Hence, use it with caution.
如您所见,内存接收器不会创建持久性 table 或全局临时视图,而是仅限于驱动程序的本地结构。因此无法从另一个 Spark 应用程序查询它。
因此必须从驱动程序查询内存输出,并在其中写入。例如,您可以模仿 console
模式,如下所示。
虚拟作家:
import pandas as pd
import numpy as np
import tempfile
import shutil
def producer(path):
temp_path = tempfile.mkdtemp()
def producer(i):
df = pd.DataFrame({
"group": np.random.randint(10, size=1000)
})
df["val"] = (
np.random.randn(1000) +
np.random.random(1000) * df["group"] +
np.random.random(1000) * i % 7
)
f = tempfile.mktemp(dir=temp_path)
df.to_csv(f, index=False)
shutil.move(f, path)
return producer
Spark 应用程序:
from pyspark.sql.types import IntegerType, DoubleType, StructType, StructField
schema = StructType([
StructField("group", IntegerType()),
StructField("val", DoubleType())
])
path = tempfile.mkdtemp()
query_name = "foo"
stream = (spark.readStream
.schema(schema)
.format("csv")
.option("header", "true")
.load(path))
query = (stream
.groupBy("group")
.avg("val")
.writeStream
.format("memory")
.queryName(query_name)
.outputMode("complete")
.start())
还有一些事件:
from rx import Observable
timer = Observable.timer(5000, 5000)
timer.subscribe(producer(path))
timer.skip(1).subscribe(lambda *_: spark.table(query_name).show())
query.awaitTermination()