用于直接批处理任务的 luigi 批处理模块
luigi batch module used in is straight batch Tasks
我有 500 个链接要下载,我想按例如 10 个项目对它们进行批处理。
这个伪代码会是什么样子?
class BatchJobTask(luigi.Task)
items = luigi.Parameter()
def run(self):
listURLs = []
with ('urls_chunk', 'r') as urls
for line in urls:
listURLs.append('http://ggg'+line+'.org')
10_urls = listURLs[0:items] #10 items here
for i in 10_urls:
req = request.get(url)
req.contents
def output(self):
return self.LocalTarger("downloaded_filelist.txt")
class BatchWorker(luigi.Task)
def run(self)
# Here I should run BatchJobTask from 0 to 10, next 11 - 21 new etc...
会怎样?
这是一种方法,可以像您想要的那样执行操作,但字符串列表作为单独的行存储在文件中。
import luigi
import requests
BATCH_SIZE = 10
class BatchProcessor(luigi.Task):
items = luigi.ListParameter()
max = luigi.IntParameter()
def requires(self):
return None
def output(self):
return luigi.LocalTarget('processed'+str(max)+'.txt')
def run(self):
for item in self.items:
req = requests.get('http://www.'+item+'.org')
# do something useful here
req.contents
open("processed"+str(max)+".txt",'w').close()
class BatchCreator(luigi.Task):
file_with_urls = luigi.Parameter()
def requires(self):
required_tasks = []
f = open(self.file_with_urls)
batch_index = 0
total_index = 0
lines = []
while True:
line = f.readline()
if not line: break
total_index += 1
if batch_index < BATCH_SIZE:
lines.append(line)
batch_index += 1
else:
required_tasks.append(BatchProcessor(batch=lines))
lines = [line]
batch_index = 1
return required_tasks
def output(self):
return luigi.LocalTarget(str(self.file_with_urls) + 'processed')
def run(self):
open(str(self.file_with_urls) + 'processed', 'w').close()
我做到了。
class GetListtask(luigi.Task)
def run(self):
...
def output(self):
return luigi.LocalTarget(self.outputfile)
class GetJustOneFile(luigi.Task):
fid = luigi.IntParameter()
def requires(self):
pass
def run(self):
url = 'http://my-server.com/test' + str(self.fid) + '.txt'
download_file = requests.get(url, stream=True)
with self.output().open('w') as downloaded_file:
downloaded_file.write(str(download_file.content))
def output(self):
return luigi.LocalTarget("test{}.txt".format(self.fid))
class GetAllFiles(luigi.WrapperTask):
def requires(self):
listoffiles = [] # 0..999
for i in range(899):
listoffiles.append(i)
return [GetJustOneFile(fid=fileid) for fileid in listoffiles]
这段代码很糟糕吗?
我有 500 个链接要下载,我想按例如 10 个项目对它们进行批处理。
这个伪代码会是什么样子?
class BatchJobTask(luigi.Task)
items = luigi.Parameter()
def run(self):
listURLs = []
with ('urls_chunk', 'r') as urls
for line in urls:
listURLs.append('http://ggg'+line+'.org')
10_urls = listURLs[0:items] #10 items here
for i in 10_urls:
req = request.get(url)
req.contents
def output(self):
return self.LocalTarger("downloaded_filelist.txt")
class BatchWorker(luigi.Task)
def run(self)
# Here I should run BatchJobTask from 0 to 10, next 11 - 21 new etc...
会怎样?
这是一种方法,可以像您想要的那样执行操作,但字符串列表作为单独的行存储在文件中。
import luigi
import requests
BATCH_SIZE = 10
class BatchProcessor(luigi.Task):
items = luigi.ListParameter()
max = luigi.IntParameter()
def requires(self):
return None
def output(self):
return luigi.LocalTarget('processed'+str(max)+'.txt')
def run(self):
for item in self.items:
req = requests.get('http://www.'+item+'.org')
# do something useful here
req.contents
open("processed"+str(max)+".txt",'w').close()
class BatchCreator(luigi.Task):
file_with_urls = luigi.Parameter()
def requires(self):
required_tasks = []
f = open(self.file_with_urls)
batch_index = 0
total_index = 0
lines = []
while True:
line = f.readline()
if not line: break
total_index += 1
if batch_index < BATCH_SIZE:
lines.append(line)
batch_index += 1
else:
required_tasks.append(BatchProcessor(batch=lines))
lines = [line]
batch_index = 1
return required_tasks
def output(self):
return luigi.LocalTarget(str(self.file_with_urls) + 'processed')
def run(self):
open(str(self.file_with_urls) + 'processed', 'w').close()
我做到了。
class GetListtask(luigi.Task)
def run(self):
...
def output(self):
return luigi.LocalTarget(self.outputfile)
class GetJustOneFile(luigi.Task):
fid = luigi.IntParameter()
def requires(self):
pass
def run(self):
url = 'http://my-server.com/test' + str(self.fid) + '.txt'
download_file = requests.get(url, stream=True)
with self.output().open('w') as downloaded_file:
downloaded_file.write(str(download_file.content))
def output(self):
return luigi.LocalTarget("test{}.txt".format(self.fid))
class GetAllFiles(luigi.WrapperTask):
def requires(self):
listoffiles = [] # 0..999
for i in range(899):
listoffiles.append(i)
return [GetJustOneFile(fid=fileid) for fileid in listoffiles]
这段代码很糟糕吗?