使用 Spark 读取文本文件并在 spark sql 上插入值

Reading a text file using Spark and inserting the value on spark sql

from pyspark import SparkContext
from pyspark import SparkConf

lines = sc.textFile("s3://test_bucket/txt/testing_consol.txt")

llist = lines.collect()

for lines in llist:
        final_query = spark.sql("""{0}
        """.format(lines))

这是 txt 文件中的内容:

select * from test_table 
where id=1

我收到错误消息:

"\nmismatched input 'where' expecting {'(', 'SELECT', 'FROM', 'ADD', 'DESC', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'INSERT', 'DELETE', 'DESCRIBE', 'EXPLAIN', 'SHOW', 'USE', 'DROP', 'ALTER', 'MAP', 'SET', 'RESET', 'START', 'COMMIT', 'ROLLBACK', 'REDUCE', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'DFS', 'TRUNCATE', 'ANALYZE', 'LIST', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'EXPORT', 'IMPORT', 'LOAD'}(line 1, pos 0)\n\n== SQL ==\nwhere id=1\n^^^\n"

如果我将 txt 文件的内容更改为一行,则 spark sql 会起作用:

select * from test_table where id=1

spark sql 似乎只能识别第一行,不能识别后续行。

只是为了阅读查询,创建一个 rdd 不是一个选项。 您应该使用 --files 参数将包含查询的文本文件传递给 spark 驱动程序 然后使用 python 打开命令读取文件并将您的查询传递给 spark sql.

完全不推荐使用 spark 读取小文件只是为了传递查询。

如果您只是合并查询行,它应该可以工作:

llist = ' '.join(lines.collect())
final_query = spark.sql(llist)