如何在多个代理或浮士德定时器之间共享浮士德table?
How to share faust table between multiple agents or faust timers?
我正在尝试在一段时间后将 faust table 的数据(计数)发布到 kafka 主题。当我发布一些简单的字符串时,计时器正在工作,但它无法以某种方式访问 table 的数据。
以下是定时器的代码:
@app.timer(interval=10.0)
async def publish_to_anomaly_topic():
await anomaly_topic.send(
value=str(page_views['total'].value())
)
@app.agent(page_view_topic)
async def count_page_views(views):
async for view in views.group_by(PageView.id):
total=0
page_views[view.id]+=1
for everykey in list(page_views.keys()):
if everykey != 'total':
total+=page_views[everykey].value()
page_views['total'] = total
代理工作正常。我能够正确地看到这些值。
经过大量实验后,您无法访问 table 的值以及应用计时器(即使您在创建 table 时指定了 relative_field 选项).此问题的解决方法是创建另一个 table 来维护消息的时间戳并在业务逻辑中使用它们。
if view.timestamp-page_views_timer[view.id+'_first_timestamp'] > 60:
await anomaly_topic.send(value={//the data to be sent})
其中 page_views_timer 是新创建的 table。
我在尝试做同样的事情时发现了这个问题,下面是我如何弄明白的。
https://faust.readthedocs.io/en/latest/userguide/tables.html
You cannot modify a table outside of a stream operation; this means
that you can only mutate the table from within an async for event in
stream: block. We require this to align the table’s partitions with
the stream’s, and to ensure the source topic partitions are correctly
rebalanced to a different worker upon failure, along with any
necessary table partitions.
Modifying a table outside of a stream will raise an error:
文档说您无法在流操作之外 access/modify table。
要解决这个问题,您可以将定时器功能分成两部分:
@app.timer(10)
async def my_timer_function():
# value does not matter as much as the send operation
await my_calling_function.send(value="send data now!")
@app.agent()
async def my_calling_function(stream_from_timer_func):
async for message in stream_from_timer_func:
print(message) # this will print "send data now!"
table_data = my_table['key']
# Here is where you can access your table data and finish sending the
# message to the topic you want
await my_topic.send(value=table_data)
如您所见,如果您使用定时器功能向代理发送消息,您可以访问您想要的 table,它只需要在
async for event in stream:
代码块。
我正在尝试在一段时间后将 faust table 的数据(计数)发布到 kafka 主题。当我发布一些简单的字符串时,计时器正在工作,但它无法以某种方式访问 table 的数据。 以下是定时器的代码:
@app.timer(interval=10.0)
async def publish_to_anomaly_topic():
await anomaly_topic.send(
value=str(page_views['total'].value())
)
@app.agent(page_view_topic)
async def count_page_views(views):
async for view in views.group_by(PageView.id):
total=0
page_views[view.id]+=1
for everykey in list(page_views.keys()):
if everykey != 'total':
total+=page_views[everykey].value()
page_views['total'] = total
代理工作正常。我能够正确地看到这些值。
经过大量实验后,您无法访问 table 的值以及应用计时器(即使您在创建 table 时指定了 relative_field 选项).此问题的解决方法是创建另一个 table 来维护消息的时间戳并在业务逻辑中使用它们。
if view.timestamp-page_views_timer[view.id+'_first_timestamp'] > 60:
await anomaly_topic.send(value={//the data to be sent})
其中 page_views_timer 是新创建的 table。
我在尝试做同样的事情时发现了这个问题,下面是我如何弄明白的。
https://faust.readthedocs.io/en/latest/userguide/tables.html
You cannot modify a table outside of a stream operation; this means that you can only mutate the table from within an async for event in stream: block. We require this to align the table’s partitions with the stream’s, and to ensure the source topic partitions are correctly rebalanced to a different worker upon failure, along with any necessary table partitions.
Modifying a table outside of a stream will raise an error:
文档说您无法在流操作之外 access/modify table。
要解决这个问题,您可以将定时器功能分成两部分:
@app.timer(10)
async def my_timer_function():
# value does not matter as much as the send operation
await my_calling_function.send(value="send data now!")
@app.agent()
async def my_calling_function(stream_from_timer_func):
async for message in stream_from_timer_func:
print(message) # this will print "send data now!"
table_data = my_table['key']
# Here is where you can access your table data and finish sending the
# message to the topic you want
await my_topic.send(value=table_data)
如您所见,如果您使用定时器功能向代理发送消息,您可以访问您想要的 table,它只需要在
async for event in stream:
代码块。