如何在 Python 中参数化 mixin 类?
How to parametrize mixin classes in Python?
我正在开发 BaseServer
摘要 class:
class BaseServer(abc.ABC):
@abc.abstractmethod
def serve(self):
raise NotImplementedError
@abc.abstractmethod
def shutdown(self):
raise NotImplementedError
@abc.abstractmethod
def send(self, *args, **kwargs):
raise NotImplementedError
@abc.abstractmethod
def finalise(self, *args, **kwargs):
raise NotImplementedError
@abc.abstractmethod
def is_serving(self):
raise NotImplementedError
def set_handler(self, handler):
self.__handler = handler
def _handle(self, *args, **kwargs):
try:
self.__handler.handle(self, *args, **kwargs)
except AttributeError:
pass
def _cleanup(self):
pass
以及ThreadPoolMixin
和ProcessPoolMixin
mixin classes(灵感来自Python的socketserver
模块):
class ThreadPoolMixin:
def _handle(self, *args, **kwargs):
try:
future = self.__pool.submit(super()._handle, *args, **kwargs)
self.__futures.append(future)
except AttributeError:
self.__pool = concurrent.futures.ThreadPoolExecutor(1)
self.__futures = []
future = self.__pool.submit(super()._handle, *args, **kwargs)
self.__futures.append(future)
def _cleanup(self):
try:
for future in self.__futures:
future.cancel()
self.__pool.shutdown()
except AttributeError:
pass
class ProcessPoolMixin:
def _handle(self, *args, **kwargs):
try:
future = self.__pool.submit(super()._handle, *args, **kwargs)
self.__futures.append(future)
except AttributeError:
self.__pool = concurrent.futures.ProcessPoolExecutor(1)
self.__futures = []
future = self.__pool.submit(super()._handle, *args, **kwargs)
self.__futures.append(future)
def _cleanup(self):
try:
for future in self.__futures:
future.cancel()
self.__pool.shutdown()
except AttributeError:
pass
def __getstate__(self):
class Temp:
pass
state = self.__dict__.copy()
temp = Temp()
temp.__dict__ = state
try:
del temp.__pool
del temp.__futures
except AttributeError:
pass
return temp.__dict__
这些 classes 允许我根据需要创建具体的 subclasses:
class HTTPServer(BaseServer):
pass
class AMQPServer(BaseServer):
pass
class ThreadingHTTPServer(ThreadPoolMixin, BaseServer):
pass
class ThreadingAMQPServer(ThreadPoolMixin, BaseServer):
pass
class ProcessingHTTPServer(ProcessPoolMixin, BaseServer):
pass
class ProcessingAMQPServer(ProcessPoolMixin, BaseServer):
pass
如何从这些子class中自定义线程池大小和进程池大小?
我找到了一个解决方案:使用 public class 属性 ,如 socketserver.ThreadingMixIn
和 socketserver.ForkingMixIn
。
class ThreadPoolMixin:
pool_size = 1
def _handle(self, *args, **kwargs):
try:
future = self.__pool.submit(super()._handle, *args, **kwargs)
self.__futures.append(future)
except AttributeError:
self.__pool = concurrent.futures.ThreadPoolExecutor(
self.pool_size)
self.__futures = []
future = self.__pool.submit(super()._handle, *args, **kwargs)
self.__futures.append(future)
def _cleanup(self):
try:
for future in self.__futures:
future.cancel()
self.__pool.shutdown()
except AttributeError:
pass
class ThreadingHTTPServer(ThreadPoolMixin, BaseServer):
pool_size = 4
class ThreadingAMQPServer(ThreadPoolMixin, BaseServer):
pool_size = 2
我正在试验的另一种方法是“mixin 工厂”,它根据需要创建新的 mixin classes:
def thread_pool_mixin(pool_size: int) -> Type:
class ThreadPoolMixin:
def _handle(self, *args, **kwargs):
. . .
# Use the closed-over "pool_size"
self.__pool = concurrent.futures.ThreadPoolExecutor(pool_size)
. . .
return ThreadPoolMixin
class ThreadingHTTPServer(thread_pool_mixin(pool_size=4), BaseServer):
pass
这似乎比混淆 class 属性更干净;虽然这似乎是常见的方法。
我正在开发 BaseServer
摘要 class:
class BaseServer(abc.ABC):
@abc.abstractmethod
def serve(self):
raise NotImplementedError
@abc.abstractmethod
def shutdown(self):
raise NotImplementedError
@abc.abstractmethod
def send(self, *args, **kwargs):
raise NotImplementedError
@abc.abstractmethod
def finalise(self, *args, **kwargs):
raise NotImplementedError
@abc.abstractmethod
def is_serving(self):
raise NotImplementedError
def set_handler(self, handler):
self.__handler = handler
def _handle(self, *args, **kwargs):
try:
self.__handler.handle(self, *args, **kwargs)
except AttributeError:
pass
def _cleanup(self):
pass
以及ThreadPoolMixin
和ProcessPoolMixin
mixin classes(灵感来自Python的socketserver
模块):
class ThreadPoolMixin:
def _handle(self, *args, **kwargs):
try:
future = self.__pool.submit(super()._handle, *args, **kwargs)
self.__futures.append(future)
except AttributeError:
self.__pool = concurrent.futures.ThreadPoolExecutor(1)
self.__futures = []
future = self.__pool.submit(super()._handle, *args, **kwargs)
self.__futures.append(future)
def _cleanup(self):
try:
for future in self.__futures:
future.cancel()
self.__pool.shutdown()
except AttributeError:
pass
class ProcessPoolMixin:
def _handle(self, *args, **kwargs):
try:
future = self.__pool.submit(super()._handle, *args, **kwargs)
self.__futures.append(future)
except AttributeError:
self.__pool = concurrent.futures.ProcessPoolExecutor(1)
self.__futures = []
future = self.__pool.submit(super()._handle, *args, **kwargs)
self.__futures.append(future)
def _cleanup(self):
try:
for future in self.__futures:
future.cancel()
self.__pool.shutdown()
except AttributeError:
pass
def __getstate__(self):
class Temp:
pass
state = self.__dict__.copy()
temp = Temp()
temp.__dict__ = state
try:
del temp.__pool
del temp.__futures
except AttributeError:
pass
return temp.__dict__
这些 classes 允许我根据需要创建具体的 subclasses:
class HTTPServer(BaseServer):
pass
class AMQPServer(BaseServer):
pass
class ThreadingHTTPServer(ThreadPoolMixin, BaseServer):
pass
class ThreadingAMQPServer(ThreadPoolMixin, BaseServer):
pass
class ProcessingHTTPServer(ProcessPoolMixin, BaseServer):
pass
class ProcessingAMQPServer(ProcessPoolMixin, BaseServer):
pass
如何从这些子class中自定义线程池大小和进程池大小?
我找到了一个解决方案:使用 public class 属性 ,如 socketserver.ThreadingMixIn
和 socketserver.ForkingMixIn
。
class ThreadPoolMixin:
pool_size = 1
def _handle(self, *args, **kwargs):
try:
future = self.__pool.submit(super()._handle, *args, **kwargs)
self.__futures.append(future)
except AttributeError:
self.__pool = concurrent.futures.ThreadPoolExecutor(
self.pool_size)
self.__futures = []
future = self.__pool.submit(super()._handle, *args, **kwargs)
self.__futures.append(future)
def _cleanup(self):
try:
for future in self.__futures:
future.cancel()
self.__pool.shutdown()
except AttributeError:
pass
class ThreadingHTTPServer(ThreadPoolMixin, BaseServer):
pool_size = 4
class ThreadingAMQPServer(ThreadPoolMixin, BaseServer):
pool_size = 2
我正在试验的另一种方法是“mixin 工厂”,它根据需要创建新的 mixin classes:
def thread_pool_mixin(pool_size: int) -> Type:
class ThreadPoolMixin:
def _handle(self, *args, **kwargs):
. . .
# Use the closed-over "pool_size"
self.__pool = concurrent.futures.ThreadPoolExecutor(pool_size)
. . .
return ThreadPoolMixin
class ThreadingHTTPServer(thread_pool_mixin(pool_size=4), BaseServer):
pass
这似乎比混淆 class 属性更干净;虽然这似乎是常见的方法。