"encoding without a string argument" 将数据发送到接收器的 Faust 代理出错?

"encoding without a string argument" error in Faust agent that sends data to a sink?

我正在尝试使用 Faust 代理中的接收器将一些数据从一个 Kafka 主题发送到另一个主题,但我一直收到此错误:

File "/.../venv/lib/python3.8/site-packages/schema_registry/serializers/message_serializer.py", line 49, in <lambda>
    return lambda record, fp: schemaless_writer(fp, avro_schema.schema, record)
Crashed reason=TypeError('encoding without a string argument')

我可以在调试中看到它发生在 fastavro 的 schemaless_writer 函数的某个地方,但是它的代码在源代码中并不清楚,所以我不确定那里到底发生了什么:https://github.com/fastavro/fastavro/blob/357543aebb94a4fd593b035f8501b20ac66d3b17/fastavro/_write.pyi

我的想法是,它试图“编码”发送数据中的每个字段,但它不适用于每个字段,这不是字符串(我有一些数字、布尔值、日期时间和字典那里)。我试图在发送之前将每个字段转换为字符串,但它也不起作用,因为它会与我的架构冲突,例如:

Crashed reason=ValueError("'True' (type <class 'str'>) do not match ['null', 'boolean']") 

也与

相关联
await converted_payment_topic.send(value=result)

但得到了相同的结果。

这是我当前的代码:

app = faust.App(f'testConvertedPayments',
          version=1,
          key_serializer='raw',
          value_serializer='raw',
          broker=KAFKA_URL,
          web_port=9090)

payment_topic = app.topic(payment_topic)
converted_payment_topic = app.topic(converted_payment_topic)

@app.agent(payment_topic, sink=[converted_payment_topic], concurrency=3)
async def payment_stream(payment):
  async for part in payment.take(1000, within=30):
    for x in part:
      result = make_payment_row(x) #just some data manipulation and making a dict
      result = {k: v for k, v in result.items() if v is not None}
      yield result

与我在文档中看到的所有示例类似,不同之处在于我的数据中不仅有字符串,有没有办法解决这个问题,而无需将我的模式和所有数据类型更改为字符串?

谢谢!

明白了,我需要发送一个 faust.Record 模型,而不是字典。 from_dict faust.Record 的方法 class 可用于从字典生成记录。