!gcloud dataproc jobs submit pyspark - ERROR AttributeError: 'str' object has no attribute 'batch'
!gcloud dataproc jobs submit pyspark - ERROR AttributeError: 'str' object has no attribute 'batch'
我如何输入数据集 - 类型作为 dataproc 作业的输入?
我的代码如下
%%writefile spark_job.py
import sys
import pyspark
import argparse
import pickle
#def time_configs_rdd(test_set, batch_sizes,batch_numbers,repetitions):
def time_configs_rdd(argv):
print(argv)
parser = argparse.ArgumentParser() # get a parser object
parser.add_argument('--out_bucket', metavar='out_bucket', required=True,
help='The bucket URL for the result.') # add a required argument
parser.add_argument('--out_file', metavar='out_file', required=True,
help='The filename for the result.') # add a required argument
parser.add_argument('--batch_size', metavar='batch_size', required=True,
help='The bucket URL for the result.') # add a required argument
parser.add_argument('--batch_number', metavar='batch_number', required=True,
help='The filename for the result.') # add a required argument
parser.add_argument('--repetitions', metavar='repetitions', required=True,
help='The filename for the result.') # add a required argument
parser.add_argument('--test_set', metavar='test_set', required=True,
help='The filename for the result.') # add a required argument
args = parser.parse_args(argv) # read the value
# the value provided with --out_bucket is now in args.out_bucket
time_configs_results = []
for s in args.batch_size:
for n in args.batch_number:
dataset = **args.test_set.batch(s).take(n)**
for r in args.repetitions:
tt0 = time.time()
for i in enumerate(dataset):
totaltime = str(time.time()-tt0)
batchtime = totaltime
#imgpersec = s*n/totaltime
time_configs_results.append((s,n,r,float(batchtime)))
#time_configs_results.append((s,n,r,batchtime,imgpersec))
time_configs_results_rdd = sc.parallelize(time_configs_results) #create an RDD with all results for each parameter
time_configs_results_rdd_avg = time_configs_results_rdd.map(lambda x: (x, x[0]*x[1]/x[3])) #RDD with the average reading speeds (RDD.map)
#mapping = time_configs_results_rdd_avg.collect()
#print(mapping)
return (time_configs_results_rdd_avg)
if 'google.colab' not in sys.modules: # Don't use system arguments when run in Colab
time_configs_rdd(sys.argv[1:])
elif __name__ == "__main__" : # but define them manually
time_configs_rdd(["--out_bucket", BUCKET, "--out_file", "time_configs_rdd_out.pkl","--batch_size", batch_size, "--batch_number", batch_number,"--test_set", test_set ] )
和执行它的代码
FILENAME = 'file_RDD_OUT.pkl'
batch_size = [1]
batch_number = [1]
repetitions = [1]
#test_set = 1 will give string error
test_set = dataset2 # file <ParallelMapDataset shapes: ((192, 192, None), ()), types: (tf.float32,
tf.string)> cannot be inserted
!gcloud dataproc jobs submit pyspark --cluster $CLUSTER --region $REGION \
./spark_job.py \
-- --out_bucket $BUCKET --out_file $FILENAME --batch_size $batch_size --batch_number $batch_number --repetitions $repetitions --test_set $test_set
不幸的是一直失败并出现错误
AttributeError: 'str' 对象没有属性 'batch'
错误:(gcloud.dataproc.jobs.submit.pyspark)作业 [c2048c422f334b08a628af5a1aa492eb] 失败,出现错误:
作业失败,消息为 [AttributeError: 'str' object has no attribute 'batch'].
问题在于 test_set 我应该如何转换 dataset2(ParallelMapDataset) 以供作业读取
所以您正在尝试将命令行参数中的字符串解析为 ParallelMapDataset 类型。您想在 add_argument
调用中使用 type
参数。
来自 https://docs.python.org/3/library/argparse.html#type 我引用:
By default, ArgumentParser objects read command-line arguments in as simple strings. However, quite often the command-line string should instead be interpreted as another type, like a float or int. The type keyword argument of add_argument() allows any necessary type-checking and type conversions to be performed.
和
type= can take any callable that takes a single string argument and returns the converted value
所以你可能想要这样的东西:
def parse_parallel_map_dataset(string):
# your logic to parse the string into your desired data structure
...
parser.add_argument('--test_set', metavar='test_set', required=True,
type=parse_parallel_map_dataset)
或者更好的是,从文件中读取 test_set 并将文件名作为参数传递。
我如何输入数据集 - 类型作为 dataproc 作业的输入?
我的代码如下
%%writefile spark_job.py
import sys
import pyspark
import argparse
import pickle
#def time_configs_rdd(test_set, batch_sizes,batch_numbers,repetitions):
def time_configs_rdd(argv):
print(argv)
parser = argparse.ArgumentParser() # get a parser object
parser.add_argument('--out_bucket', metavar='out_bucket', required=True,
help='The bucket URL for the result.') # add a required argument
parser.add_argument('--out_file', metavar='out_file', required=True,
help='The filename for the result.') # add a required argument
parser.add_argument('--batch_size', metavar='batch_size', required=True,
help='The bucket URL for the result.') # add a required argument
parser.add_argument('--batch_number', metavar='batch_number', required=True,
help='The filename for the result.') # add a required argument
parser.add_argument('--repetitions', metavar='repetitions', required=True,
help='The filename for the result.') # add a required argument
parser.add_argument('--test_set', metavar='test_set', required=True,
help='The filename for the result.') # add a required argument
args = parser.parse_args(argv) # read the value
# the value provided with --out_bucket is now in args.out_bucket
time_configs_results = []
for s in args.batch_size:
for n in args.batch_number:
dataset = **args.test_set.batch(s).take(n)**
for r in args.repetitions:
tt0 = time.time()
for i in enumerate(dataset):
totaltime = str(time.time()-tt0)
batchtime = totaltime
#imgpersec = s*n/totaltime
time_configs_results.append((s,n,r,float(batchtime)))
#time_configs_results.append((s,n,r,batchtime,imgpersec))
time_configs_results_rdd = sc.parallelize(time_configs_results) #create an RDD with all results for each parameter
time_configs_results_rdd_avg = time_configs_results_rdd.map(lambda x: (x, x[0]*x[1]/x[3])) #RDD with the average reading speeds (RDD.map)
#mapping = time_configs_results_rdd_avg.collect()
#print(mapping)
return (time_configs_results_rdd_avg)
if 'google.colab' not in sys.modules: # Don't use system arguments when run in Colab
time_configs_rdd(sys.argv[1:])
elif __name__ == "__main__" : # but define them manually
time_configs_rdd(["--out_bucket", BUCKET, "--out_file", "time_configs_rdd_out.pkl","--batch_size", batch_size, "--batch_number", batch_number,"--test_set", test_set ] )
和执行它的代码
FILENAME = 'file_RDD_OUT.pkl'
batch_size = [1]
batch_number = [1]
repetitions = [1]
#test_set = 1 will give string error
test_set = dataset2 # file <ParallelMapDataset shapes: ((192, 192, None), ()), types: (tf.float32,
tf.string)> cannot be inserted
!gcloud dataproc jobs submit pyspark --cluster $CLUSTER --region $REGION \
./spark_job.py \
-- --out_bucket $BUCKET --out_file $FILENAME --batch_size $batch_size --batch_number $batch_number --repetitions $repetitions --test_set $test_set
不幸的是一直失败并出现错误
AttributeError: 'str' 对象没有属性 'batch' 错误:(gcloud.dataproc.jobs.submit.pyspark)作业 [c2048c422f334b08a628af5a1aa492eb] 失败,出现错误: 作业失败,消息为 [AttributeError: 'str' object has no attribute 'batch'].
问题在于 test_set 我应该如何转换 dataset2(ParallelMapDataset) 以供作业读取
所以您正在尝试将命令行参数中的字符串解析为 ParallelMapDataset 类型。您想在 add_argument
调用中使用 type
参数。
来自 https://docs.python.org/3/library/argparse.html#type 我引用:
By default, ArgumentParser objects read command-line arguments in as simple strings. However, quite often the command-line string should instead be interpreted as another type, like a float or int. The type keyword argument of add_argument() allows any necessary type-checking and type conversions to be performed.
和
type= can take any callable that takes a single string argument and returns the converted value
所以你可能想要这样的东西:
def parse_parallel_map_dataset(string):
# your logic to parse the string into your desired data structure
...
parser.add_argument('--test_set', metavar='test_set', required=True,
type=parse_parallel_map_dataset)
或者更好的是,从文件中读取 test_set 并将文件名作为参数传递。