将 JSON 解析为 U-SQL 然后转换为 csv

Parse JSON into U-SQL then convert to csv

我正在尝试将一些 JSON 格式的遥测数据转换为 CSV 格式,然后使用 U-SQL 将其写入文件。

问题是某些 JSON 键值中有句点,因此当我执行 SELECT 操作时,U-SQL 无法识别它们.当我检查输出文件时,我看到的只是 "p1" 的值。我如何在脚本中表示 JSON 键名的名称以便识别它们。在此先感谢您的帮助!

代码:

REFERENCE ASSEMBLY MATSDevDB.[Newtonsoft.Json];
REFERENCE ASSEMBLY MATSDevDB.[Microsoft.Analytics.Samples.Formats]; 

USING Microsoft.Analytics.Samples.Formats.Json;


@jsonDocuments = 
    EXTRACT jsonString string 
    FROM @"adl://xxxx.azuredatalakestore.net/xxxx/{*}/{*}/{*}/telemetry_{*}.json" 
    USING Extractors.Tsv(quoting:false);

@jsonify = 
    SELECT Microsoft.Analytics.Samples.Formats.Json.JsonFunctions.JsonTuple(jsonString) AS json 
    FROM @jsonDocuments;

@columnized = SELECT 
            json["EventInfo.Source"] AS EventInfoSource,
            json["EventInfo.InitId"] AS EventInfoInitId,
            json["EventInfo.Sequence"] AS EventInfoSequence,
            json["EventInfo.Name"] AS EventInfoName,
            json["EventInfo.Time"] AS EventInfoTime,
            json["EventInfo.SdkVersion"] AS EventInfoSdkVersion,
            json["AppInfo.Language"] AS AppInfoLanguage,
            json["UserInfo.Language"] AS UserInfoLanguage,
            json["DeviceInfo.BrowserName"] AS DeviceInfoBrowswerName,
            json["DeviceInfo.BrowserVersion"] AS BrowswerVersion,
            json["DeviceInfo.OsName"] AS DeviceInfoOsName,
            json["DeviceInfo.OsVersion"] AS DeviceInfoOsVersion,
            json["DeviceInfo.Id"] AS DeviceInfoId,
            json["p1"] AS p1,
            json["PipelineInfo.AccountId"] AS PipelineInfoAccountId, 
            json["PipelineInfo.IngestionTime"] AS PipelineInfoIngestionTime, 
            json["PipelineInfo.ClientIp"] AS PipelineInfoClientIp,
            json["PipelineInfo.ClientCountry"] AS PipelineInfoClientCountry,
            json["PipelineInfo.IngestionPath"] AS PipelineInfoIngestionPath,
            json["AppInfo.Id"] AS AppInfoId,
            json["EventInfo.Id"] AS EventInfoId,
            json["EventInfo.BaseType"] AS EventInfoBaseType,
            json["EventINfo.IngestionTime"] AS EventINfoIngestionTime
    FROM @jsonify;

OUTPUT @columnized
TO "adl://xxxx.azuredatalakestore.net/poc/TestResult.csv"
USING Outputters.Csv(quoting : false);

JSON:

{"EventInfo.Source":"JS_default_source","EventInfo.Sequence":"1","EventInfo.Name":"daysofweek","EventInfo.Time":"2018-01 -25T21:09:36.779Z","EventInfo.SdkVersion":"ACT-Web-JS-2.6.0","AppInfo.Language":"en","UserInfo.Language":"en-US","UserInfo.TimeZone" :"-08:00","DeviceInfo.BrowserName":"Chrome","DeviceInfo.BrowserVersion":"63.0.3239.132","DeviceInfo.OsName":"Mac OS X","DeviceInfo.OsVersion": "10","p1":"V1","PipelineInfo.IngestionTime":"2018-01-25T21:09:33.9930000Z","PipelineInfo.ClientCountry":"CA","PipelineInfo.IngestionPath":"FastPath","EventInfo.BaseType":"custom","EventInfo.IngestionTime":"2018-01-25T21:09:33.9930000Z"}

我得到这个可以使用单引号和单方括号,例如

@columnized = SELECT 
            json["['EventInfo.Source']"] AS EventInfoSource,
...

完整代码:

@columnized = SELECT 
            json["['EventInfo.Source']"] AS EventInfoSource,
            json["['EventInfo.InitId']"] AS EventInfoInitId,
            json["['EventInfo.Sequence']"] AS EventInfoSequence,
            json["['EventInfo.Name']"] AS EventInfoName,
            json["['EventInfo.Time']"] AS EventInfoTime,
            json["['EventInfo.SdkVersion']"] AS EventInfoSdkVersion,
            json["['AppInfo.Language']"] AS AppInfoLanguage,
            json["['UserInfo.Language']"] AS UserInfoLanguage,
            json["['DeviceInfo.BrowserName']"] AS DeviceInfoBrowswerName,
            json["['DeviceInfo.BrowserVersion']"] AS BrowswerVersion,
            json["['DeviceInfo.OsName']"] AS DeviceInfoOsName,
            json["['DeviceInfo.OsVersion']"] AS DeviceInfoOsVersion,
            json["['DeviceInfo.Id']"] AS DeviceInfoId,
            json["p1"] AS p1,
            json["['PipelineInfo.AccountId']"] AS PipelineInfoAccountId, 
            json["['PipelineInfo.IngestionTime']"] AS PipelineInfoIngestionTime, 
            json["['PipelineInfo.ClientIp']"] AS PipelineInfoClientIp,
            json["['PipelineInfo.ClientCountry']"] AS PipelineInfoClientCountry,
            json["['PipelineInfo.IngestionPath']"] AS PipelineInfoIngestionPath,
            json["['AppInfo.Id']"] AS AppInfoId,
            json["['EventInfo.Id']"] AS EventInfoId,
            json["['EventInfo.BaseType']"] AS EventInfoBaseType,
            json["['EventINfo.IngestionTime']"] AS EventINfoIngestionTime
    FROM @jsonify;

我的结果: