从代码启动流程消费者并获取信号回调
start process consumer from code and get signal callbacks
如何从 cpu 绑定任务的代码启动流程使用者?
我如何在没有立即的情况下获得信号回调?
如果我 运行 MemoryHuey with immediate=True 一切正常,但如果我将其设置为 False,我只会得到空列表。
问题:
我有几个端点必须以不同的优先级进行处理。这些进程都是 CPU 密集的,必须移至后台并使用多处理进行处理。 Redis 集群稍后用作作业存储。
谢谢:)
PS: 忽略异步
编辑:或者有一个简单的解决方案 运行 with supervisor in AWS EB?
huey = MemoryHuey('worker', results=True, store_none=False, verbose=True, immediate=False)
x = huey.create_consumer(workers=mp.cpu_count(), periodic=False, initial_delay=0.1, backoff=1.15, max_delay=10.0, scheduler_interval=1, worker_type=WORKER_PROCESS, check_worker_health=True, health_check_interval=10, flush_locks=False)
print(x)
print(type(x))
<huey.consumer.Consumer object at 0x7f101cd3c490>
<class 'huey.consumer.Consumer'>
MODEL_PATH = "model"
app = FastAPI()
router = APIRouter()
app.include_router(router)
# pool = ConnectionPool.from_url(url=os.getenv('REDIS_URL'), max_connections=100)
# only for testing
#pool = ConnectionPool(host='redis', port=6379, max_connections=100)
try:
huey = PriorityRedisHuey(
'worker',
results=True, # set to True if testing
store_none=False,
# host=os.getenv('REDIS_HOST'),
# port=6379,
# only for docker-compose testing
host='redis',
port=6379
)
huey.create_consumer(workers=mp.cpu_count(), periodic=False, initial_delay=0.1, backoff=1.15, max_delay=10.0, scheduler_interval=1, worker_type=WORKER_PROCESS, check_worker_health=True, health_check_interval=10, flush_locks=False)
except Exception as e:
print("exception during connection catched", e.status_code, e.detail)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
"""Entrypoint
"""
return {"message": "OCR Engine and NER service"}
# https://huey.readthedocs.io/en/latest/signals.html
# TODO: build monitoring after Redis setup
jobs = list()
@app.get("/job_monitoring")
async def monitor_jobs():
if len(jobs) > 200:
del jobs[:]
return jobs
@huey.signal()
def all_signal_handler(signal, task, exc=None, jobs=jobs):
now = datetime.now()
dt_string = now.strftime(r"%d/%m/%Y %H:%M:%S")
jobs.append((dt_string, task.id, task.name, task.args[0], signal, exc))
@app.post("/ner", callbacks=router.routes)
async def ner(invoice: Invoice, token=Depends(get_token_from_header), callback_url: Optional[AnyHttpUrl] = None,
max_sentence_number: int = 0, return_merged: bool = True, crop_image: int = 30, ocr: bool = True):
"""Named Entity Recognition endpoint
"""
task = __process_ner_request(invoice, token, callback_url, max_sentence_number, return_merged, crop_image, ocr)
return {"message": "NER Process : %s : %s sent in the background" % (task.id, invoice.path)}
@app.post("/ner_complete", callbacks=router.routes)
async def ner_complete(invoice: Invoice, token=Depends(get_token_from_header),
callback_url: Optional[AnyHttpUrl] = None, max_sentence_number: int = 0,
return_merged: bool = True, crop_image: int = 0, ocr: bool = False):
"""ner_complete endpoint except txt files
"""
task = __process_ner_request(invoice, token, callback_url, max_sentence_number, return_merged, crop_image, ocr)
return {"message": "NER Complete Process : %s : %s sent in the background" % (task.id, invoice.path)}
@app.post("/txt", callbacks=router.routes)
async def ocr_txt(invoice: Invoice, token=Depends(get_token_from_header), callback_url: Optional[AnyHttpUrl] = None,
res_width: int = 3500, check_before_ocr: bool = True):
"""txt endpoint
"""
task = __process_ocr_request(invoice, 'txt', token, callback_url, res_width, check_before_ocr)
return {"message": "OCR Process : %s : %s sent in the background" % (task.id, invoice.path)}
@app.post("/pdfa", callbacks=router.routes)
async def ocr_pdfa(invoice: Invoice, token=Depends(get_token_from_header), callback_url: Optional[AnyHttpUrl] = None,
res_width: int = 3500, check_before_ocr: bool = True):
"""pdfa endpoint
"""
task = __process_ocr_request(invoice, 'pdfa', token, callback_url, res_width, check_before_ocr)
return {"message": "OCR Process : %s : %s sent in the background" % (task.id, invoice.path)}
@huey.task(priority=1)
def __process_ocr_request(invoice, mode, token, callback_url, res_width, check_before_ocr):
"""one job execution and returns result to URL
Parameters
----------
invoice : class
API input
mode : str
return as txt or pdfa
token : str
validation token
callback_url : str
url for result callback
res_width : int
width for image resizing
check_before_ocr : bool
checks if image is usable for OCR
"""
headers = {'Authorization': 'Bearer ' + token}
try:
start = timer()
result = handle_ocr(invoice, mode, res_width, check_before_ocr)
time = timedelta(seconds=timer() - start) / timedelta(milliseconds=1)
requests.post(callback_url, headers=headers, json={'content': result.content, 'execTime': time})
except HTTPException as e:
print("exception catched", e.status_code, e.detail)
requests.post(callback_url, headers=headers, json={"errors": [{"statusCode": 500, "message": e.detail}]})
@huey.task(priority=10)
def __process_ner_request(invoice, token, callback_url, max_sentence_number, return_merged, crop_image, ocr):
"""one job execution and returns result to URL
Parameters
----------
invoice : class
API input
token : str
validation token
callback_url : str
url for result callback
max_sentence_number : int
max number of sentences to process
return_merged : bool
if true merge result tokens
crop_image : int
percentage to crop from image
ocr : bool
if true do ocr
"""
headers = {'Authorization': 'Bearer ' + token}
try:
start = timer()
result = get_entities(invoice, max_sentence_number, return_merged, crop_image, ocr)
time = timedelta(seconds=timer() - start) / timedelta(milliseconds=1)
requests.post(callback_url, headers=headers, json={'content': result.content, 'execTime': time})
except HTTPException as e:
print("exception catched", e.status_code, e.detail)
requests.post(callback_url, headers=headers, json={"errors": [{"statusCode": 500, "message": e.detail}]})
编辑:我使用 ElasticBeanstalk 进行部署/运行与主管一起使用?
您正在尝试跨进程共享内存,但这是行不通的。每个进程都有自己的内存副本——这就是为什么在使用多进程 huey 时需要使用像 Redis、Sqlite 或文件系统这样的代理。
如何从 cpu 绑定任务的代码启动流程使用者?
我如何在没有立即的情况下获得信号回调? 如果我 运行 MemoryHuey with immediate=True 一切正常,但如果我将其设置为 False,我只会得到空列表。
问题:
我有几个端点必须以不同的优先级进行处理。这些进程都是 CPU 密集的,必须移至后台并使用多处理进行处理。 Redis 集群稍后用作作业存储。
谢谢:) PS: 忽略异步
编辑:或者有一个简单的解决方案 运行 with supervisor in AWS EB?
huey = MemoryHuey('worker', results=True, store_none=False, verbose=True, immediate=False)
x = huey.create_consumer(workers=mp.cpu_count(), periodic=False, initial_delay=0.1, backoff=1.15, max_delay=10.0, scheduler_interval=1, worker_type=WORKER_PROCESS, check_worker_health=True, health_check_interval=10, flush_locks=False)
print(x)
print(type(x))
<huey.consumer.Consumer object at 0x7f101cd3c490>
<class 'huey.consumer.Consumer'>
MODEL_PATH = "model"
app = FastAPI()
router = APIRouter()
app.include_router(router)
# pool = ConnectionPool.from_url(url=os.getenv('REDIS_URL'), max_connections=100)
# only for testing
#pool = ConnectionPool(host='redis', port=6379, max_connections=100)
try:
huey = PriorityRedisHuey(
'worker',
results=True, # set to True if testing
store_none=False,
# host=os.getenv('REDIS_HOST'),
# port=6379,
# only for docker-compose testing
host='redis',
port=6379
)
huey.create_consumer(workers=mp.cpu_count(), periodic=False, initial_delay=0.1, backoff=1.15, max_delay=10.0, scheduler_interval=1, worker_type=WORKER_PROCESS, check_worker_health=True, health_check_interval=10, flush_locks=False)
except Exception as e:
print("exception during connection catched", e.status_code, e.detail)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
"""Entrypoint
"""
return {"message": "OCR Engine and NER service"}
# https://huey.readthedocs.io/en/latest/signals.html
# TODO: build monitoring after Redis setup
jobs = list()
@app.get("/job_monitoring")
async def monitor_jobs():
if len(jobs) > 200:
del jobs[:]
return jobs
@huey.signal()
def all_signal_handler(signal, task, exc=None, jobs=jobs):
now = datetime.now()
dt_string = now.strftime(r"%d/%m/%Y %H:%M:%S")
jobs.append((dt_string, task.id, task.name, task.args[0], signal, exc))
@app.post("/ner", callbacks=router.routes)
async def ner(invoice: Invoice, token=Depends(get_token_from_header), callback_url: Optional[AnyHttpUrl] = None,
max_sentence_number: int = 0, return_merged: bool = True, crop_image: int = 30, ocr: bool = True):
"""Named Entity Recognition endpoint
"""
task = __process_ner_request(invoice, token, callback_url, max_sentence_number, return_merged, crop_image, ocr)
return {"message": "NER Process : %s : %s sent in the background" % (task.id, invoice.path)}
@app.post("/ner_complete", callbacks=router.routes)
async def ner_complete(invoice: Invoice, token=Depends(get_token_from_header),
callback_url: Optional[AnyHttpUrl] = None, max_sentence_number: int = 0,
return_merged: bool = True, crop_image: int = 0, ocr: bool = False):
"""ner_complete endpoint except txt files
"""
task = __process_ner_request(invoice, token, callback_url, max_sentence_number, return_merged, crop_image, ocr)
return {"message": "NER Complete Process : %s : %s sent in the background" % (task.id, invoice.path)}
@app.post("/txt", callbacks=router.routes)
async def ocr_txt(invoice: Invoice, token=Depends(get_token_from_header), callback_url: Optional[AnyHttpUrl] = None,
res_width: int = 3500, check_before_ocr: bool = True):
"""txt endpoint
"""
task = __process_ocr_request(invoice, 'txt', token, callback_url, res_width, check_before_ocr)
return {"message": "OCR Process : %s : %s sent in the background" % (task.id, invoice.path)}
@app.post("/pdfa", callbacks=router.routes)
async def ocr_pdfa(invoice: Invoice, token=Depends(get_token_from_header), callback_url: Optional[AnyHttpUrl] = None,
res_width: int = 3500, check_before_ocr: bool = True):
"""pdfa endpoint
"""
task = __process_ocr_request(invoice, 'pdfa', token, callback_url, res_width, check_before_ocr)
return {"message": "OCR Process : %s : %s sent in the background" % (task.id, invoice.path)}
@huey.task(priority=1)
def __process_ocr_request(invoice, mode, token, callback_url, res_width, check_before_ocr):
"""one job execution and returns result to URL
Parameters
----------
invoice : class
API input
mode : str
return as txt or pdfa
token : str
validation token
callback_url : str
url for result callback
res_width : int
width for image resizing
check_before_ocr : bool
checks if image is usable for OCR
"""
headers = {'Authorization': 'Bearer ' + token}
try:
start = timer()
result = handle_ocr(invoice, mode, res_width, check_before_ocr)
time = timedelta(seconds=timer() - start) / timedelta(milliseconds=1)
requests.post(callback_url, headers=headers, json={'content': result.content, 'execTime': time})
except HTTPException as e:
print("exception catched", e.status_code, e.detail)
requests.post(callback_url, headers=headers, json={"errors": [{"statusCode": 500, "message": e.detail}]})
@huey.task(priority=10)
def __process_ner_request(invoice, token, callback_url, max_sentence_number, return_merged, crop_image, ocr):
"""one job execution and returns result to URL
Parameters
----------
invoice : class
API input
token : str
validation token
callback_url : str
url for result callback
max_sentence_number : int
max number of sentences to process
return_merged : bool
if true merge result tokens
crop_image : int
percentage to crop from image
ocr : bool
if true do ocr
"""
headers = {'Authorization': 'Bearer ' + token}
try:
start = timer()
result = get_entities(invoice, max_sentence_number, return_merged, crop_image, ocr)
time = timedelta(seconds=timer() - start) / timedelta(milliseconds=1)
requests.post(callback_url, headers=headers, json={'content': result.content, 'execTime': time})
except HTTPException as e:
print("exception catched", e.status_code, e.detail)
requests.post(callback_url, headers=headers, json={"errors": [{"statusCode": 500, "message": e.detail}]})
编辑:我使用 ElasticBeanstalk 进行部署/运行与主管一起使用?
您正在尝试跨进程共享内存,但这是行不通的。每个进程都有自己的内存副本——这就是为什么在使用多进程 huey 时需要使用像 Redis、Sqlite 或文件系统这样的代理。