如何从 Zeppelin 中的控制台流式接收器获取输出?
How to get the output from console streaming sink in Zeppelin?
当来自 Zeppelin 的 运行 时,我正在努力让 console
接收器与 PySpark Structured Streaming 一起工作。基本上,我没有在屏幕上或我找到的任何日志文件中看到任何结果。
我的问题: 有没有人有一个使用 PySpark 结构化流和接收器的工作示例,该接收器产生在 Apache Zeppelin 中可见的输出?理想情况下,它也会使用套接字源,因为这很容易测试。
我正在使用:
- Ubuntu16.04
- spark-2.2.0-bin-hadoop2.7
- zeppelin-0.7.3-bin-all
- Python3
我的代码基于 structured_network_wordcount.py example。当来自 PySpark shell (./bin/pyspark --master local[2]
) 的 运行 时有效;我看到每批次的表格。
%pyspark
# structured streaming
from pyspark.sql.functions import *
lines = spark\
.readStream\
.format('socket')\
.option('host', 'localhost')\
.option('port', 9999)\
.option('includeTimestamp', 'true')\
.load()
# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into multiple rows
words = lines.select(
explode(split(lines.value, ' ')).alias('word'),
lines.timestamp
)
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, '10 seconds', '1 seconds'),
words.word
).count().orderBy('window')
# Start running the query that prints the windowed word counts to the console
query = windowedCounts\
.writeStream\
.outputMode('complete')\
.format('console')\
.option('truncate', 'false')\
.start()
print("Starting...")
query.awaitTermination(20)
我希望看到每个批次的结果打印输出,但我只看到 Starting...
,然后是 False
,query.awaitTermination(20)
的 return 值.
在一个单独的终端中,我将一些数据输入到 nc -lk 9999
netcat 会话中,而上面的是 运行ning。
zeppelin-0.7.3-bin-all
使用 Spark 2.1.0(因此很遗憾没有 rate
格式来测试结构化流)。
确保当您 start
时,使用 socket
源 nc -lk 9999
的流式查询已经开始(否则查询会停止)。
还要确保查询确实启动并且 运行。
val lines = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load
val q = lines.writeStream.format("console").start
您确实无法在 Zeppelin 笔记本中看到输出可能,因为:
流式查询从它们自己的线程开始(这似乎超出了 Zeppelin 的范围)
console
sink writes to standard output(在那个单独的线程上使用 Dataset.show
运算符)。
所有这些使得 "intercepting" 输出在 Zeppelin 中不可用。
所以我们来回答真正的问题:
Where is the standard output written to in Zeppelin?
嗯,由于对 Zeppelin 内部结构的了解非常有限,我认为它可能是 logs/zeppelin-interpreter-spark-[hostname].log
,但不幸的是找不到 console
接收器的输出。在那里您可以找到使用 log4j 但 console
接收器不使用的 Spark(尤其是结构化流)的日志。
看来您唯一的长期解决方案是编写您自己的 console
类自定义接收器并使用 log4j 记录器。老实说,这并不像听起来那么难。关注 the sources of console sink.
控制台接收器不是交互式基于笔记本的工作流的好选择。即使在可以捕获输出的 Scala 中,它也需要 awaitTermination
在同一段落中调用(或等效),从而有效地阻止注释。
%spark
spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", "9999")
.option("includeTimestamp", "true")
.load()
.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.start()
.awaitTermination() // Block execution, to force Zeppelin to capture the output
链式 awaitTermination
可以替换为独立调用 在同一段落中 也可以:
%spark
val query = df
.writeStream
...
.start()
query.awaitTermination()
没有它,Zeppelin 没有理由等待任何输出。 PySpark 只是在此之上添加了另一个问题——间接执行。因此,即使阻止查询也无济于事。
此外,在浏览笔记时,来自流的连续输出会导致渲染问题和内存问题(可以通过 InterpreterContext
或 REST API 使用 Zeppelin 显示系统来实现一点更明智的行为,输出被覆盖或定期清除)。
使用 Zeppelin 进行测试的更好选择是 memory sink。这样你就可以在不阻塞的情况下开始查询:
%pyspark
query = (windowedCounts
.writeStream
.outputMode("complete")
.format("memory")
.queryName("some_name")
.start())
并在另一段按需查询结果:
%pyspark
spark.table("some_name").show()
它可以与 或类似的解决方案结合使用,以提供基于间隔的更新。
也可以使用 StreamingQueryListener
和 Py4j 回调来耦合 rx
和 onQueryProgress
事件,尽管 PySpark 不支持查询侦听器,并且需要一些代码,把东西粘在一起。 Scala 接口:
package com.example.spark.observer
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
trait PythonObserver {
def on_next(o: Object): Unit
}
class PythonStreamingQueryListener(observer: PythonObserver)
extends StreamingQueryListener {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
observer.on_next(event)
}
override def onQueryStarted(event: QueryStartedEvent): Unit = {}
override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
构建一个 jar,调整构建定义以反映所需的 Scala 和 Spark 版本:
scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion
)
将其放在 Spark 类路径中,补丁 StreamingQueryManager
:
%pyspark
from pyspark.sql.streaming import StreamingQueryManager
from pyspark import SparkContext
def addListener(self, listener):
jvm = SparkContext._active_spark_context._jvm
jlistener = jvm.com.example.spark.observer.PythonStreamingQueryListener(
listener
)
self._jsqm.addListener(jlistener)
return jlistener
StreamingQueryManager.addListener = addListener
启动回调服务器:
%pyspark
sc._gateway.start_callback_server()
并添加监听器:
%pyspark
from rx.subjects import Subject
class StreamingObserver(Subject):
class Java:
implements = ["com.example.spark.observer.PythonObserver"]
observer = StreamingObserver()
spark.streams.addListener(observer)
最后你可以使用 subscribe
并阻止执行:
%pyspark
(observer
.map(lambda p: p.progress().name())
# .filter() can be used to print only for a specific query
.subscribe(lambda n: spark.table(n).show() if n else None))
input() # Block execution to capture the output
最后一步应该在您开始流式查询后执行。
也可以跳过 rx
并使用像这样的最小观察者:
class StreamingObserver(object):
class Java:
implements = ["com.example.spark.observer.PythonObserver"]
def on_next(self, value):
try:
name = value.progress().name()
if name:
spark.table(name).show()
except: pass
它提供的控制比 Subject
少一点(需要注意的是,这可能会干扰其他代码打印到标准输出,并且只能通过 removing listener 停止。使用 Subject
你可以很容易地 dispose
subscribed
观察者,一旦你完成了),但否则应该或多或少地工作。
请注意,任何阻塞操作都足以捕获侦听器的输出,并且不必在同一个单元格中执行。例如
%pyspark
observer = StreamingObserver()
spark.streams.addListener(observer)
和
%pyspark
import time
time.sleep(42)
将以类似的方式工作,在定义的时间间隔内打印 table。
为了完整起见,您可以实施 StreamingQueryManager.removeListener
。
当来自 Zeppelin 的 运行 时,我正在努力让 console
接收器与 PySpark Structured Streaming 一起工作。基本上,我没有在屏幕上或我找到的任何日志文件中看到任何结果。
我的问题: 有没有人有一个使用 PySpark 结构化流和接收器的工作示例,该接收器产生在 Apache Zeppelin 中可见的输出?理想情况下,它也会使用套接字源,因为这很容易测试。
我正在使用:
- Ubuntu16.04
- spark-2.2.0-bin-hadoop2.7
- zeppelin-0.7.3-bin-all
- Python3
我的代码基于 structured_network_wordcount.py example。当来自 PySpark shell (./bin/pyspark --master local[2]
) 的 运行 时有效;我看到每批次的表格。
%pyspark
# structured streaming
from pyspark.sql.functions import *
lines = spark\
.readStream\
.format('socket')\
.option('host', 'localhost')\
.option('port', 9999)\
.option('includeTimestamp', 'true')\
.load()
# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into multiple rows
words = lines.select(
explode(split(lines.value, ' ')).alias('word'),
lines.timestamp
)
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, '10 seconds', '1 seconds'),
words.word
).count().orderBy('window')
# Start running the query that prints the windowed word counts to the console
query = windowedCounts\
.writeStream\
.outputMode('complete')\
.format('console')\
.option('truncate', 'false')\
.start()
print("Starting...")
query.awaitTermination(20)
我希望看到每个批次的结果打印输出,但我只看到 Starting...
,然后是 False
,query.awaitTermination(20)
的 return 值.
在一个单独的终端中,我将一些数据输入到 nc -lk 9999
netcat 会话中,而上面的是 运行ning。
zeppelin-0.7.3-bin-all
使用 Spark 2.1.0(因此很遗憾没有 rate
格式来测试结构化流)。
确保当您 start
时,使用 socket
源 nc -lk 9999
的流式查询已经开始(否则查询会停止)。
还要确保查询确实启动并且 运行。
val lines = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load
val q = lines.writeStream.format("console").start
您确实无法在 Zeppelin 笔记本中看到输出可能,因为:
流式查询从它们自己的线程开始(这似乎超出了 Zeppelin 的范围)
console
sink writes to standard output(在那个单独的线程上使用Dataset.show
运算符)。
所有这些使得 "intercepting" 输出在 Zeppelin 中不可用。
所以我们来回答真正的问题:
Where is the standard output written to in Zeppelin?
嗯,由于对 Zeppelin 内部结构的了解非常有限,我认为它可能是 logs/zeppelin-interpreter-spark-[hostname].log
,但不幸的是找不到 console
接收器的输出。在那里您可以找到使用 log4j 但 console
接收器不使用的 Spark(尤其是结构化流)的日志。
看来您唯一的长期解决方案是编写您自己的 console
类自定义接收器并使用 log4j 记录器。老实说,这并不像听起来那么难。关注 the sources of console sink.
控制台接收器不是交互式基于笔记本的工作流的好选择。即使在可以捕获输出的 Scala 中,它也需要 awaitTermination
在同一段落中调用(或等效),从而有效地阻止注释。
%spark
spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", "9999")
.option("includeTimestamp", "true")
.load()
.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.start()
.awaitTermination() // Block execution, to force Zeppelin to capture the output
链式 awaitTermination
可以替换为独立调用 在同一段落中 也可以:
%spark
val query = df
.writeStream
...
.start()
query.awaitTermination()
没有它,Zeppelin 没有理由等待任何输出。 PySpark 只是在此之上添加了另一个问题——间接执行。因此,即使阻止查询也无济于事。
此外,在浏览笔记时,来自流的连续输出会导致渲染问题和内存问题(可以通过 InterpreterContext
或 REST API 使用 Zeppelin 显示系统来实现一点更明智的行为,输出被覆盖或定期清除)。
使用 Zeppelin 进行测试的更好选择是 memory sink。这样你就可以在不阻塞的情况下开始查询:
%pyspark
query = (windowedCounts
.writeStream
.outputMode("complete")
.format("memory")
.queryName("some_name")
.start())
并在另一段按需查询结果:
%pyspark
spark.table("some_name").show()
它可以与
也可以使用 StreamingQueryListener
和 Py4j 回调来耦合 rx
和 onQueryProgress
事件,尽管 PySpark 不支持查询侦听器,并且需要一些代码,把东西粘在一起。 Scala 接口:
package com.example.spark.observer
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
trait PythonObserver {
def on_next(o: Object): Unit
}
class PythonStreamingQueryListener(observer: PythonObserver)
extends StreamingQueryListener {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
observer.on_next(event)
}
override def onQueryStarted(event: QueryStartedEvent): Unit = {}
override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
构建一个 jar,调整构建定义以反映所需的 Scala 和 Spark 版本:
scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion
)
将其放在 Spark 类路径中,补丁 StreamingQueryManager
:
%pyspark
from pyspark.sql.streaming import StreamingQueryManager
from pyspark import SparkContext
def addListener(self, listener):
jvm = SparkContext._active_spark_context._jvm
jlistener = jvm.com.example.spark.observer.PythonStreamingQueryListener(
listener
)
self._jsqm.addListener(jlistener)
return jlistener
StreamingQueryManager.addListener = addListener
启动回调服务器:
%pyspark
sc._gateway.start_callback_server()
并添加监听器:
%pyspark
from rx.subjects import Subject
class StreamingObserver(Subject):
class Java:
implements = ["com.example.spark.observer.PythonObserver"]
observer = StreamingObserver()
spark.streams.addListener(observer)
最后你可以使用 subscribe
并阻止执行:
%pyspark
(observer
.map(lambda p: p.progress().name())
# .filter() can be used to print only for a specific query
.subscribe(lambda n: spark.table(n).show() if n else None))
input() # Block execution to capture the output
最后一步应该在您开始流式查询后执行。
也可以跳过 rx
并使用像这样的最小观察者:
class StreamingObserver(object):
class Java:
implements = ["com.example.spark.observer.PythonObserver"]
def on_next(self, value):
try:
name = value.progress().name()
if name:
spark.table(name).show()
except: pass
它提供的控制比 Subject
少一点(需要注意的是,这可能会干扰其他代码打印到标准输出,并且只能通过 removing listener 停止。使用 Subject
你可以很容易地 dispose
subscribed
观察者,一旦你完成了),但否则应该或多或少地工作。
请注意,任何阻塞操作都足以捕获侦听器的输出,并且不必在同一个单元格中执行。例如
%pyspark
observer = StreamingObserver()
spark.streams.addListener(observer)
和
%pyspark
import time
time.sleep(42)
将以类似的方式工作,在定义的时间间隔内打印 table。
为了完整起见,您可以实施 StreamingQueryManager.removeListener
。