如何将 avro 文件转换为 json 文件,该文件最初是通过 Postman 通过 Azure Event Hub 作为原始 json 发送的?

How to convert from an avro file to a json file, which was originally sent as raw json via Postman through Azure Event Hub?

所以问题是我最初通过 Postman 将其作为原始数据发送:

原始数据 JSON 通过 Postman 发送:

{
   "id":1,
   "receiver":"2222222222222",
   "message":{
      "Name":"testing",
      "PersonId":2,
      "CarId":2,
      "GUID":"1s3q1d-s546dq1-8e22e",
      "LineId":2,
      "SvcId":2,
      "Lat":-64.546547,
      "Lon":-64.546547,
      "TimeStamp":"2021-03-18T08:29:36.758Z",
      "Recorder":"dq65ds4qdezzer",
      "Env":"DEV"
   },
   "operator":20404,
   "sender":"MSISDN",
   "binary":1,
   "sent":"2021-03-18T08:29:36.758Z"
}

一旦事件中心捕获捕获到它,它就会转换为 Avro 文件。 我正在尝试使用 fastavro 检索数据并将其转换为 JSON 格式。 问题是我没有取回最初由 Postman 发送的原始数据。我找不到将其转换回原始状态的方法,为什么 Avro 还向我发送来自 Postman 的附加信息? 我可能需要找到一种方法将“正文”设置为仅转换。但是出于某种原因,它还在正文中添加了“字节” 我只是想取回通过 Postman 发送的原始原始数据。

init.py(Azure 函数)

    import logging
    import os
    import string
    import json
    import uuid
    import avro.schema
    import tempfile
    import azure.functions as func
    from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, __version__
    from avro.datafile import DataFileReader, DataFileWriter
    from avro.io import DatumReader, DatumWriter
    from fastavro import reader, json_writer
    
    
    #Because the Apache Python avro package is written in pure Python, it is relatively slow, therefoer I make use of fastavro
    def avroToJson(avroFile):
        with open("json_file.json", "w") as json_file:
            with open(avroFile, "rb") as avro_file:
                avro_reader = reader(avro_file)
                json_writer(json_file, avro_reader.writer_schema, avro_reader)
    
    
    def main(req: func.HttpRequest) -> func.HttpResponse:
      logging.info('Python HTTP trigger function processed a request.')
      print('Processor started using path ' + os.getcwd())
      connect_str = "###########"
      container = ContainerClient.from_connection_string(connect_str, container_name="####")
      blob_list = container.list_blobs() # List the blobs in the container.
      for blob in blob_list:
          # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files).
          if blob.size > 508:
              print('Downloaded a non empty blob: ' + blob.name)
              # Create a blob client for the blob.
              blob_client = ContainerClient.get_blob_client(container, blob=blob.name)
              # Construct a file name based on the blob name.
              cleanName = str.replace(blob.name, '/', '_')
              cleanName = os.getcwd() + '\' + cleanName
              # Download file
              with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. 
                  my_file.write(blob_client.download_blob().readall())# Write blob contents into the file.
                  
              avroToJson(cleanName)
              with open('json_file.json','r') as file:
                   jsonStr = file.read()
            
      return func.HttpResponse(jsonStr, status_code=200)

预期结果:

{
   "id":1,
   "receiver":"2222222222222",
   "message":{
      "Name":"testing",
      "PersonId":2,
      "CarId":2,
      "GUID":"1s3q1d-s546dq1-8e22e",
      "LineId":2,
      "SvcId":2,
      "Lat":-64.546547,
      "Lon":-64.546547,
      "TimeStamp":"2021-03-18T08:29:36.758Z",
      "Recorder":"dq65ds4qdezzer",
      "Env":"DEV"
   },
   "operator":20404,
   "sender":"MSISDN",
   "binary":1,
   "sent":"2021-03-18T08:29:36.758Z"
}

实际结果:

{
   "SequenceNumber":19,
   "Offset":"10928",
   "EnqueuedTimeUtc":"4/1/2021 8:43:19 AM",
   "SystemProperties":{
      "x-opt-enqueued-time":{
         "long":1617266599145
      }
   },
   "Properties":{
      "Postman-Token":{
         "string":"37ff4cc6-9124-45e5-ba9d-######e"
      }
   },
   "Body":{
      "bytes":"{\r\n  \"id\": 1,\r\n  \"receiver\": \"2222222222222\",\r\n  \"message\": {\r\n    \"Name\": \"testing\",\r\n    \"PersonId\": 2,\r\n    \"CarId\": 2,\r\n    \"GUID\": \"1s3q1d-s546dq1-8e22e\",\r\n    \"LineId\": 2,\r\n    \"SvcId\": 2,\r\n    \"Lat\": -64.546547,\r\n    \"Lon\": -64.546547,\r\n    \"TimeStamp\": \"2021-03-18T08:29:36.758Z\",\r\n    \"Recorder\": \"dq65ds4qdezzer\",\r\n    \"Env\": \"DEV\"\r\n  },\r\n  \"operator\": 20404,\r\n  \"sender\": \"MSISDN\",\r\n  \"binary\": 1,\r\n  \"sent\": \"2021-03-29T08:29:36.758Z\"\r\n}"
   }
}

这个问题最初发布在这个 Alternative to Azure Event Hub Capture for sending Event Hub messages to Blob Storage? 主题下,因为最初的问题出现了另一个问题。

如果这不是继续 Whosebug 的方式,请随时评论我下次应该如何处理。亲切的问候。

JSON HTTP 请求正文中的文档进入事件中心的消息正文,该消息很快写入捕获目标,并带有一些额外的属性,如序列号、偏移量、排队时间、系统属性等开销.

在反序列化时,reader需要单独使用body对象,它应该是来自HTTP请求的同一个body。

随时检查此页面中的事件中心 AVRO 架构 - https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-capture-overview#use-avro-tools

尝试返回正文:

return func.HttpResponse(json.loads(jsonStr)['body']['bytes'], status_code=200)