Spark 的套接字文本流为空
Spark's socket text stream is empty
我正在关注 Spark 的直播 guide。我没有使用 nc -lk 9999
,而是创建了自己的简单 Python 服务器,如下所示。从下面的代码可以看出,它会随机生成字母a
到z
.
import socketserver
import time
from random import choice
class AlphaTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
print('AlphaTCPHandler')
alphabets = list('abcdefghikjklmnopqrstuvwxyz')
try:
while True:
s = f'{choice(alphabets)}'
b = bytes(s, 'utf-8')
self.request.sendall(b)
time.sleep(1)
except BrokenPipeError:
print('broken pipe detected')
if __name__ == '__main__':
host = '0.0.0.0'
port = 301
server = socketserver.TCPServer((host, port), AlphaTCPHandler)
print(f'server starting {host}:{port}')
server.serve_forever()
我用客户端代码测试了这个服务器,如下。
import socket
import sys
import time
HOST, PORT = 'localhost', 301
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((HOST, PORT))
print('socket opened')
while True:
received = str(sock.recv(1024), 'utf-8')
if len(received.strip()) > 0:
print(f'{received}')
time.sleep(1)
finally:
sock.close()
print('socket closed')
但是,我的 Spark 流代码似乎没有接收到任何数据或没有打印任何内容。代码如下
from pyspark.streaming import StreamingContext
from time import sleep
ssc = StreamingContext(sc, 1)
ssc.checkpoint('/tmp')
lines = ssc.socketTextStream('0.0.0.0', 301)
words = lines.flatMap(lambda s: s.split(' '))
pairs = words.map(lambda word: (word, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
counts.pprint()
ssc.start()
sleep(5)
ssc.stop(stopSparkContext=False, stopGraceFully=True)
我从输出中看到的只是下面的重复模式。
-------------------------------------------
Time: 2019-10-31 08:38:22
-------------------------------------------
-------------------------------------------
Time: 2019-10-31 08:38:23
-------------------------------------------
-------------------------------------------
Time: 2019-10-31 08:38:24
-------------------------------------------
对我做错了什么有什么想法吗?
您的流式传输代码运行正常。是您的服务器向它提供了错误的信息——每个字母后没有行分隔符,因此 Spark 看到的是一条不断增长的行,它只是一直在等待该行结束,但这种情况从未发生过。修改您的服务器以发送一个包含每个字母的新行:
while True:
s = f'{choice(alphabets)}\n' # <-- inserted \n in here
b = bytes(s, 'utf-8')
self.request.sendall(b)
time.sleep(1)
结果:
-------------------------------------------
Time: 2019-10-31 12:09:26
-------------------------------------------
('t', 1)
-------------------------------------------
Time: 2019-10-31 12:09:27
-------------------------------------------
('t', 1)
-------------------------------------------
Time: 2019-10-31 12:09:28
-------------------------------------------
('x', 1)
我正在关注 Spark 的直播 guide。我没有使用 nc -lk 9999
,而是创建了自己的简单 Python 服务器,如下所示。从下面的代码可以看出,它会随机生成字母a
到z
.
import socketserver
import time
from random import choice
class AlphaTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
print('AlphaTCPHandler')
alphabets = list('abcdefghikjklmnopqrstuvwxyz')
try:
while True:
s = f'{choice(alphabets)}'
b = bytes(s, 'utf-8')
self.request.sendall(b)
time.sleep(1)
except BrokenPipeError:
print('broken pipe detected')
if __name__ == '__main__':
host = '0.0.0.0'
port = 301
server = socketserver.TCPServer((host, port), AlphaTCPHandler)
print(f'server starting {host}:{port}')
server.serve_forever()
我用客户端代码测试了这个服务器,如下。
import socket
import sys
import time
HOST, PORT = 'localhost', 301
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((HOST, PORT))
print('socket opened')
while True:
received = str(sock.recv(1024), 'utf-8')
if len(received.strip()) > 0:
print(f'{received}')
time.sleep(1)
finally:
sock.close()
print('socket closed')
但是,我的 Spark 流代码似乎没有接收到任何数据或没有打印任何内容。代码如下
from pyspark.streaming import StreamingContext
from time import sleep
ssc = StreamingContext(sc, 1)
ssc.checkpoint('/tmp')
lines = ssc.socketTextStream('0.0.0.0', 301)
words = lines.flatMap(lambda s: s.split(' '))
pairs = words.map(lambda word: (word, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
counts.pprint()
ssc.start()
sleep(5)
ssc.stop(stopSparkContext=False, stopGraceFully=True)
我从输出中看到的只是下面的重复模式。
------------------------------------------- Time: 2019-10-31 08:38:22 ------------------------------------------- ------------------------------------------- Time: 2019-10-31 08:38:23 ------------------------------------------- ------------------------------------------- Time: 2019-10-31 08:38:24 -------------------------------------------
对我做错了什么有什么想法吗?
您的流式传输代码运行正常。是您的服务器向它提供了错误的信息——每个字母后没有行分隔符,因此 Spark 看到的是一条不断增长的行,它只是一直在等待该行结束,但这种情况从未发生过。修改您的服务器以发送一个包含每个字母的新行:
while True:
s = f'{choice(alphabets)}\n' # <-- inserted \n in here
b = bytes(s, 'utf-8')
self.request.sendall(b)
time.sleep(1)
结果:
-------------------------------------------
Time: 2019-10-31 12:09:26
-------------------------------------------
('t', 1)
-------------------------------------------
Time: 2019-10-31 12:09:27
-------------------------------------------
('t', 1)
-------------------------------------------
Time: 2019-10-31 12:09:28
-------------------------------------------
('x', 1)