在 PySpark 脚本中找不到 moveToHDFS 文件
moveToHDFS file not found in PySpark script
我有一个奇怪的问题。
当我在终端中 运行 这个命令时,它有效并且确实将文件复制到所需位置。
hdfs dfs -copyFromLocal concsessions.csv /user/username/spark_exports/
但是,当我 运行 它作为我的脚本的一部分时(如下),它没有,我得到这个错误 - 谁能帮助我?
我不确定我做错了什么,肯定有问题!
OSError: [Errno 2] No such file or directory
代码:
from pyspark.sql import SparkSession
from datetime import datetime
#Set the date for the filename
now = datetime.now()
yday = long(now.strftime('%s')) - 24*60*60
spark = SparkSession\
.builder\
.appName('wap')\
.master('yarn')\
.enableHiveSupport()\
.getOrCreate()
import datetime
import pyspark.sql.functions as F
from pyspark.sql.functions import col
#The below prints your results to your chosen destination (Hive, Stdout, CSV)
print('data load starting...')
cmd = '''select * from db.conc_sessions'''
df1 = spark.sql(cmd)
df1.printSchema()
print('data ingested successfully')
print('setting variables...')
timestart= '2019-10-14 00:00:00'
timeend= '2019-10-14 23:59:59'
time_to_check = datetime.datetime.strptime(timestart, '%Y-%m-%d %H:%M:%S')
iters = 0
session = 0
add = []
print('begin iteration...')
while iters < 96:
time_to_add = iters * 900
time_to_checkx = time_to_check + datetime.timedelta(seconds=time_to_add)
stringtime = time_to_checkx.strftime("%m/%d/%Y, %H:%M:%S")
iters = iters + 1
spark_date_format = "YYYY-MM-dd hh:mm:ss"
df1 = df1.withColumn('start_timestamp', F.to_timestamp(df1.start_time, spark_date_format))
df1 = df1.withColumn('end_timestamp', F.to_timestamp(df1.end_time, spark_date_format))
filterx = df1.filter( (df1.start_time < time_to_checkx) & (df1.end_time > time_to_checkx ))
session = filterx.count()
newrow = [stringtime, session]
add.append(newrow)
import pandas as pd
output = pd.DataFrame.from_records(add)
output.columns = ['time','count']
output = output.groupby(['time'])[['count']].agg('sum').reset_index()
output.to_csv('concsessions.csv', sep=',')
#copy the CSV from the local server to HDFS
import subprocess
subprocess.call("hdfs dfs -copyFromLocal concsessions.csv /user/username/spark_exports/")
您应该在 subprocess.call
中使用 shell=True
。
来自https://docs.python.org/2/library/subprocess.html#frequently-used-arguments
shell 参数(默认为 False)指定是否使用 shell 作为要执行的程序。如果 shell 为 True,建议将 args 作为字符串而不是序列传递。
在 shell=True 的 Unix 上,shell 默认为 /bin/sh。
按如下方式编辑您的通话并查看。
subprocess.call("hdfs dfs -copyFromLocal concsessions.csv /user/username/spark_exports/", shell=True)
subprocess.call(['hdfs', 'dfs', '-copyFromLocal' , 'concession.csv', 'user/username/spark_exports'])
我有一个奇怪的问题。
当我在终端中 运行 这个命令时,它有效并且确实将文件复制到所需位置。
hdfs dfs -copyFromLocal concsessions.csv /user/username/spark_exports/
但是,当我 运行 它作为我的脚本的一部分时(如下),它没有,我得到这个错误 - 谁能帮助我?
我不确定我做错了什么,肯定有问题!
OSError: [Errno 2] No such file or directory
代码:
from pyspark.sql import SparkSession
from datetime import datetime
#Set the date for the filename
now = datetime.now()
yday = long(now.strftime('%s')) - 24*60*60
spark = SparkSession\
.builder\
.appName('wap')\
.master('yarn')\
.enableHiveSupport()\
.getOrCreate()
import datetime
import pyspark.sql.functions as F
from pyspark.sql.functions import col
#The below prints your results to your chosen destination (Hive, Stdout, CSV)
print('data load starting...')
cmd = '''select * from db.conc_sessions'''
df1 = spark.sql(cmd)
df1.printSchema()
print('data ingested successfully')
print('setting variables...')
timestart= '2019-10-14 00:00:00'
timeend= '2019-10-14 23:59:59'
time_to_check = datetime.datetime.strptime(timestart, '%Y-%m-%d %H:%M:%S')
iters = 0
session = 0
add = []
print('begin iteration...')
while iters < 96:
time_to_add = iters * 900
time_to_checkx = time_to_check + datetime.timedelta(seconds=time_to_add)
stringtime = time_to_checkx.strftime("%m/%d/%Y, %H:%M:%S")
iters = iters + 1
spark_date_format = "YYYY-MM-dd hh:mm:ss"
df1 = df1.withColumn('start_timestamp', F.to_timestamp(df1.start_time, spark_date_format))
df1 = df1.withColumn('end_timestamp', F.to_timestamp(df1.end_time, spark_date_format))
filterx = df1.filter( (df1.start_time < time_to_checkx) & (df1.end_time > time_to_checkx ))
session = filterx.count()
newrow = [stringtime, session]
add.append(newrow)
import pandas as pd
output = pd.DataFrame.from_records(add)
output.columns = ['time','count']
output = output.groupby(['time'])[['count']].agg('sum').reset_index()
output.to_csv('concsessions.csv', sep=',')
#copy the CSV from the local server to HDFS
import subprocess
subprocess.call("hdfs dfs -copyFromLocal concsessions.csv /user/username/spark_exports/")
您应该在 subprocess.call
中使用 shell=True
。
来自https://docs.python.org/2/library/subprocess.html#frequently-used-arguments
shell 参数(默认为 False)指定是否使用 shell 作为要执行的程序。如果 shell 为 True,建议将 args 作为字符串而不是序列传递。
在 shell=True 的 Unix 上,shell 默认为 /bin/sh。
按如下方式编辑您的通话并查看。
subprocess.call("hdfs dfs -copyFromLocal concsessions.csv /user/username/spark_exports/", shell=True)
subprocess.call(['hdfs', 'dfs', '-copyFromLocal' , 'concession.csv', 'user/username/spark_exports'])