使用 socketTextStream 将数据提取到 spark 中
Ingesting data into spark using socketTextStream
我正在从 twitter API 获取推文,然后通过 tcp 连接将它们转发到 spark 从中读取数据的套接字。这是我的代码
供参考 line
看起来像这样
{
data : {
text: "some tweet",
id: some number
}
matching_rules: [{tag: "some string", id: same number}, {tag:...}]
}
def ingest_into_spark(tcp_conn, stream):
for line in stream.iter_lines():
if not (line is None):
try :
# print(line)
tweet = json.loads(line)['matching_rules'][0]['tag']
# tweet = json.loads(line)['data']['text']
print(tweet, type(tweet), len(tweet))
tcp_conn.sendall(tweet.encode('utf-8'))
except Exception as e:
print("Exception in ingesting data: ", e)
火花端代码:
print(f"Connecting to {SPARK_IP}:{SPARK_PORT}...")
input_stream = streaming_context.socketTextStream(SPARK_IP, int(SPARK_PORT))
print(f"Connected to {SPARK_IP}:{SPARK_PORT}")
tags = input_stream.flatMap(lambda tags: tags.strip().split())
mapped_hashtags = tags.map(lambda hashtag: (hashtag, 1))
counts=mapped_hashtags.reduceByKey(lambda a, b: a+b)
counts.pprint()
无论我做什么,spark 都不会读取通过流发送的数据。但是当我用 tweet = json.loads(line)['data']['text']
行替换 tweet = json.loads(line)['matching_rules'][0]['tag']
行时,它突然按预期工作了。我尝试在两行中打印推文的内容及其类型,并在两行中打印其字符串。唯一的区别是第一个有实际的推文,而第二个只有一个词。
我已经尝试过许多不同类型的输入并对输入进行硬编码。但我无法想象为什么阅读 json 的不同领域会使我的代码停止工作。
用 netcat 替换客户端或服务器表明在这两种情况下都按预期通过套接字发送数据
如果没有解决方案,我愿意了解将数据摄取到 spark 的替代方法,也可以在这种情况下使用
如documentation中所述,文本流中的记录(行)由新行分隔(\n
)。与 print()
不同,sendall()
是一个 byte-oriented 函数,它不会自动添加新行。无论您发送了多少标签,Spark 都会继续将所有内容作为一条记录读取,等待分隔符出现。当您改为发送推文文本时,它会起作用,因为某些推文确实包含换行符。
尝试以下操作,看看是否有效:
tcp_conn.sendall((tweet + '\n').encode('utf-8'))
我正在从 twitter API 获取推文,然后通过 tcp 连接将它们转发到 spark 从中读取数据的套接字。这是我的代码
供参考 line
看起来像这样
{
data : {
text: "some tweet",
id: some number
}
matching_rules: [{tag: "some string", id: same number}, {tag:...}]
}
def ingest_into_spark(tcp_conn, stream):
for line in stream.iter_lines():
if not (line is None):
try :
# print(line)
tweet = json.loads(line)['matching_rules'][0]['tag']
# tweet = json.loads(line)['data']['text']
print(tweet, type(tweet), len(tweet))
tcp_conn.sendall(tweet.encode('utf-8'))
except Exception as e:
print("Exception in ingesting data: ", e)
火花端代码:
print(f"Connecting to {SPARK_IP}:{SPARK_PORT}...")
input_stream = streaming_context.socketTextStream(SPARK_IP, int(SPARK_PORT))
print(f"Connected to {SPARK_IP}:{SPARK_PORT}")
tags = input_stream.flatMap(lambda tags: tags.strip().split())
mapped_hashtags = tags.map(lambda hashtag: (hashtag, 1))
counts=mapped_hashtags.reduceByKey(lambda a, b: a+b)
counts.pprint()
无论我做什么,spark 都不会读取通过流发送的数据。但是当我用 tweet = json.loads(line)['data']['text']
行替换 tweet = json.loads(line)['matching_rules'][0]['tag']
行时,它突然按预期工作了。我尝试在两行中打印推文的内容及其类型,并在两行中打印其字符串。唯一的区别是第一个有实际的推文,而第二个只有一个词。
我已经尝试过许多不同类型的输入并对输入进行硬编码。但我无法想象为什么阅读 json 的不同领域会使我的代码停止工作。
用 netcat 替换客户端或服务器表明在这两种情况下都按预期通过套接字发送数据
如果没有解决方案,我愿意了解将数据摄取到 spark 的替代方法,也可以在这种情况下使用
如documentation中所述,文本流中的记录(行)由新行分隔(\n
)。与 print()
不同,sendall()
是一个 byte-oriented 函数,它不会自动添加新行。无论您发送了多少标签,Spark 都会继续将所有内容作为一条记录读取,等待分隔符出现。当您改为发送推文文本时,它会起作用,因为某些推文确实包含换行符。
尝试以下操作,看看是否有效:
tcp_conn.sendall((tweet + '\n').encode('utf-8'))