在 PySpark 中使用 Map 解析和分配列名
Using Map in PySpark to parse and assign column names
这是我想要做的。
输入数据如下所示(制表符分隔):
12/01/2018 user1 123.123.222.111 23.3s
12/01/2018 user2 123.123.222.116 21.1s
数据通过 Kafka 传入,并使用以下代码进行解析。
kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kafkaStream.map(lambda x: x[1])
parsed_log = lines.flatMap(lambda line: line.split(" "))
.map(lambda item: ('key', {
'date': item['date'],
'user': item['user'],
'ip': item['ip'],
'duration': item['duration'],}))
解析后的日志应采用以下格式:
('key', {'date': 12/01/2018, 'user': user1, 'ip': 123.123.222.111, 'duration': 23.3s})
('key', {'date': 12/01/2018, 'user': user2, 'ip': 123.123.222.116, 'duration': 21.1s})
在我的代码中,"lines" 和 "parsed_log" 的代码行并没有完成这项工作。你能告诉我怎么做吗?
这是解决方案:
lines = kafkaStream.map(lambda x: x[1])
variables_per_stream = lines.map(lambda line: line.split(" "))
variable_to_key=variables_per_stream.map(lambda item: ('key', {'id': item[0],'name': item[1]}))
这是我想要做的。
输入数据如下所示(制表符分隔):
12/01/2018 user1 123.123.222.111 23.3s
12/01/2018 user2 123.123.222.116 21.1s
数据通过 Kafka 传入,并使用以下代码进行解析。
kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kafkaStream.map(lambda x: x[1])
parsed_log = lines.flatMap(lambda line: line.split(" "))
.map(lambda item: ('key', {
'date': item['date'],
'user': item['user'],
'ip': item['ip'],
'duration': item['duration'],}))
解析后的日志应采用以下格式:
('key', {'date': 12/01/2018, 'user': user1, 'ip': 123.123.222.111, 'duration': 23.3s})
('key', {'date': 12/01/2018, 'user': user2, 'ip': 123.123.222.116, 'duration': 21.1s})
在我的代码中,"lines" 和 "parsed_log" 的代码行并没有完成这项工作。你能告诉我怎么做吗?
这是解决方案:
lines = kafkaStream.map(lambda x: x[1])
variables_per_stream = lines.map(lambda line: line.split(" "))
variable_to_key=variables_per_stream.map(lambda item: ('key', {'id': item[0],'name': item[1]}))