输入和结果对象的 Pythonic 解耦
Pythonic Decoupling of Input and Resulting Object
我有各种输入流,它们提供我想要打包到一个对象中的信息,然后我可以将其传递给单独的函数:
class FruitBasket:
def __init__(self, a, b, c, **kwargs):
self.a = a
self.b = b
self.c = c
但是,每个数据流都以不同的方式命名我要查找的数据。 StreamA 的输入如下所示:
{"apple" : 3, "banana" : 2, "citrus" : 0}
StreamB 的输入如下所示:
{"fruit1" : 1, "fruit2" : 7, "fruit3" : 4, "dog" : 21}
目前我的解决方案是有一个单独的 class 映射输入数据和对象输出之间的信息:
class StreamAMap(Enum):
a = "apple"
b = "banana"
c = "citrus"
class StreamBMap(Enum):
a = "fruit2"
b = "fruit1"
c = "fruit3"
然后我将此 "map" class、目标对象和输入数据传递到一个函数中:
def translate(map_enum, target, data):
translation = {}
for item in map_enum:
translation[item.name] = data[item.value]
return(target(**translation))
data_a = {"apple" : 3, "banana" : 2, "citrus" : 0}
data_b = {"fruit1" : 1, "fruit2" : 7, "fruit3" : 4, "dog" : 21}
basket_1 = translate(StreamAMap, FruitBasket, data_a)
basket_2 = translate(StreamBMap, FruitBasket, data_b)
它有效,但我想知道是否有更 Pythonic 的方法来实现它。
编辑:
这里的目标是保持 FruitBasket
与任何可能产生数据并最终进入其中的流完全分离,即每个流都知道什么信息需要进入 FruitBasket
,但不管添加或删除什么流将来,我不必实际更改 FruitBasket
class 的任何内容,而只需确保每个流都向主应用程序返回有效的 FruitBasket
。我当前的实现只需要为我添加的每个新流定义一个 "map" class,我想知道是否有 "Pythonic"(内置)方法来执行此操作而不是滚动我自己的翻译功能来做到这一点。
为了阐明我所说的 "Pythonic" 的意思,"Pythonic" 确定列表均值的方法是 sum(list) / len(list)
而不是遍历列表以递增计数器变量并添加每个项目到一个总和变量。
忘记 "Pythonic" 忘记 "OOP correct way" 并考虑保持理智。保持理智的明显方法是 "OOP correct" 和 "Pythonic".
因此,您有一些在系统外部是异构的信息,但您希望在系统内部传递这些信息的对象具有定义明确的单一行为。
明显的解决方案是在您的系统中有一个 class,它的行为相同,具有明确定义的属性和方法,并且有办法 create class 的对象可以处理异构数据输入。输出也是如此——如果您需要以多种形式输出信息,您可以编写方法将您的(统一的,在应用程序中)信息转换为每个输出通道所需的特定形式。
更具体地说 - 你的 class 应该有你手头需要的字段作为你的应用程序核心业务的属性,或者有一个多态初始化方法,各种辅助方法来从每个输入创建实例类型或工厂函数(或一组工厂函数)来创建您的实例。
但是一旦它们被创建,它们都应该只具有 a
、b
和 c
属性。
我上面给出的选项中的第二个模式可能是一个不错的开始:
class FruitBasket:
def __init__(self, a, b, c):
self.a = a
self.b = b
self.c = c
@classmethod
def from_named_stream(self, stream):
return FruitBasket(a=stream["apple"], b=stream["banana", c=stream["citrus"])
# go on and create one such classmethod for each form of input data
所以我找到了一种非常简单明了的方法:
class FruitBasket:
def __init__(self, a, b, c, **kwargs):
self.a = a
self.b = b
self.c = c
class FruitBasketIntermediateA:
def __init__(self, apple, banana, citrus, **kwargs):
self.a = apple
self.b = banana
self.c = citrus
class FruitBasketIntermediateB:
def __init__(self, fruit1, fruit2, fruit3, **kwargs):
self.a = fruit2
self.b = fruit1
self.c = fruit3
data_a = {"apple" : 3, "banana" : 2, "citrus" : 0}
data_b = {"fruit1" : 1, "fruit2" : 7, "fruit3" : 4, "dog" : 21}
my_fb_a = FruitBasket(**vars(FruitBasketIntermediateA(**data)))
my_fb_b = FruitBasket(**vars(FruitBasketIntermediateB(**data)))
如果我每秒消耗数十万条消息,我可能不得不使用性能更高的方法,例如原始问题中的方法,但使用字典而不是枚举,或者使用 jsbueno 的方法,但是硬性限制是少了几个数量级,所以这很适合我的需要,并使事情分离和简洁。
我有各种输入流,它们提供我想要打包到一个对象中的信息,然后我可以将其传递给单独的函数:
class FruitBasket:
def __init__(self, a, b, c, **kwargs):
self.a = a
self.b = b
self.c = c
但是,每个数据流都以不同的方式命名我要查找的数据。 StreamA 的输入如下所示:
{"apple" : 3, "banana" : 2, "citrus" : 0}
StreamB 的输入如下所示:
{"fruit1" : 1, "fruit2" : 7, "fruit3" : 4, "dog" : 21}
目前我的解决方案是有一个单独的 class 映射输入数据和对象输出之间的信息:
class StreamAMap(Enum):
a = "apple"
b = "banana"
c = "citrus"
class StreamBMap(Enum):
a = "fruit2"
b = "fruit1"
c = "fruit3"
然后我将此 "map" class、目标对象和输入数据传递到一个函数中:
def translate(map_enum, target, data):
translation = {}
for item in map_enum:
translation[item.name] = data[item.value]
return(target(**translation))
data_a = {"apple" : 3, "banana" : 2, "citrus" : 0}
data_b = {"fruit1" : 1, "fruit2" : 7, "fruit3" : 4, "dog" : 21}
basket_1 = translate(StreamAMap, FruitBasket, data_a)
basket_2 = translate(StreamBMap, FruitBasket, data_b)
它有效,但我想知道是否有更 Pythonic 的方法来实现它。
编辑:
这里的目标是保持 FruitBasket
与任何可能产生数据并最终进入其中的流完全分离,即每个流都知道什么信息需要进入 FruitBasket
,但不管添加或删除什么流将来,我不必实际更改 FruitBasket
class 的任何内容,而只需确保每个流都向主应用程序返回有效的 FruitBasket
。我当前的实现只需要为我添加的每个新流定义一个 "map" class,我想知道是否有 "Pythonic"(内置)方法来执行此操作而不是滚动我自己的翻译功能来做到这一点。
为了阐明我所说的 "Pythonic" 的意思,"Pythonic" 确定列表均值的方法是 sum(list) / len(list)
而不是遍历列表以递增计数器变量并添加每个项目到一个总和变量。
忘记 "Pythonic" 忘记 "OOP correct way" 并考虑保持理智。保持理智的明显方法是 "OOP correct" 和 "Pythonic".
因此,您有一些在系统外部是异构的信息,但您希望在系统内部传递这些信息的对象具有定义明确的单一行为。
明显的解决方案是在您的系统中有一个 class,它的行为相同,具有明确定义的属性和方法,并且有办法 create class 的对象可以处理异构数据输入。输出也是如此——如果您需要以多种形式输出信息,您可以编写方法将您的(统一的,在应用程序中)信息转换为每个输出通道所需的特定形式。
更具体地说 - 你的 class 应该有你手头需要的字段作为你的应用程序核心业务的属性,或者有一个多态初始化方法,各种辅助方法来从每个输入创建实例类型或工厂函数(或一组工厂函数)来创建您的实例。
但是一旦它们被创建,它们都应该只具有 a
、b
和 c
属性。
我上面给出的选项中的第二个模式可能是一个不错的开始:
class FruitBasket:
def __init__(self, a, b, c):
self.a = a
self.b = b
self.c = c
@classmethod
def from_named_stream(self, stream):
return FruitBasket(a=stream["apple"], b=stream["banana", c=stream["citrus"])
# go on and create one such classmethod for each form of input data
所以我找到了一种非常简单明了的方法:
class FruitBasket:
def __init__(self, a, b, c, **kwargs):
self.a = a
self.b = b
self.c = c
class FruitBasketIntermediateA:
def __init__(self, apple, banana, citrus, **kwargs):
self.a = apple
self.b = banana
self.c = citrus
class FruitBasketIntermediateB:
def __init__(self, fruit1, fruit2, fruit3, **kwargs):
self.a = fruit2
self.b = fruit1
self.c = fruit3
data_a = {"apple" : 3, "banana" : 2, "citrus" : 0}
data_b = {"fruit1" : 1, "fruit2" : 7, "fruit3" : 4, "dog" : 21}
my_fb_a = FruitBasket(**vars(FruitBasketIntermediateA(**data)))
my_fb_b = FruitBasket(**vars(FruitBasketIntermediateB(**data)))
如果我每秒消耗数十万条消息,我可能不得不使用性能更高的方法,例如原始问题中的方法,但使用字典而不是枚举,或者使用 jsbueno 的方法,但是硬性限制是少了几个数量级,所以这很适合我的需要,并使事情分离和简洁。