IngestFromStreamAsync 方法不起作用

IngestFromStreamAsync method does not work

我设法使用以下代码成功摄取数据

var kcsbDM = new KustoConnectionStringBuilder("https://test123.southeastasia.kusto.windows.net", "testdb").WithAadApplicationTokenAuthentication(acquireTokenTask.AccessToken);

使用 (var ingestClient = KustoIngestFactory.CreateDirectIngestClient(kcsbDM))
{

var ingestProps = new KustoQueuedIngestionProperties("testdb", "TraceLog");
ingestProps.ReportLevel = IngestionReportLevel.FailuresOnly;
ingestProps.ReportMethod = IngestionReportMethod.Queue;
ingestProps.Format = DataSourceFormat.json;

//generate datastream and columnmapping

ingestProps.IngestionMapping = new IngestionMapping() { IngestionMappings = columnMappings };
var ingestionResult = ingestClient.IngestFromStream(memStream, ingestProps);

}

当我尝试使用 QueuedClient 和 IngestFromStreamAsync 时,代码已成功执行,但即使在 30 分钟后也没有任何数据被摄取到数据库中

var kcsbDM = new KustoConnectionStringBuilder("https://ingest-test123.southeastasia.kusto.windows.net", "testdb").WithAadApplicationTokenAuthentication(acquireTokenTask.AccessToken);

使用 (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(kcsbDM))
{

var ingestProps = new KustoQueuedIngestionProperties("testdb", "TraceLog");
ingestProps.ReportLevel = IngestionReportLevel.FailuresOnly;
ingestProps.ReportMethod = IngestionReportMethod.Queue;
ingestProps.Format = DataSourceFormat.json;

//generate datastream and columnmapping

ingestProps.IngestionMapping = new IngestionMapping() { IngestionMappings = columnMappings };
var ingestionResult = ingestClient.IngestFromStreamAsync(memStream, ingestProps);

}

尝试运行。显示“https://test123.southeastasia.kusto.windows.net”端点上的摄取失败,查看是否存在摄取错误。 另外,你设置了Queue reporting method,你可以通过从queue中读取得到详细的结果。

ingestProps.ReportLevel = IngestionReportLevel.FailuresOnly;
ingestProps.ReportMethod = IngestionReportMethod.Queue;

(在您使用 KustoQueuedIngestionProperties 的第一个示例中,您应该使用 KustoIngestionPropertiesKustoQueuedIngestionProperties 具有将被摄取客户端忽略的其他属性,例如 ReportLevel 和 ReportMethod)

能否将这行更改为: var ingestionResult = await ingestClient.IngestFromStreamAsync(memStream, ingestProps);

另请注意,排队摄取在实际摄取数据之前有一个最多 5 分钟的批处理阶段: https://docs.microsoft.com/en-us/azure/data-explorer/kusto/management/batchingpolicy https://docs.microsoft.com/en-us/azure/data-explorer/kusto/management/batching-policy

我终于找到原因了,需要在 table:

中启用流摄取
.alter table TraceLog policy streamingingestion enable

详情见the Azure documentation

实际上只有在

的情况下才需要启用流式传输策略
  • 集群中的流摄取已打开(Azure 门户)
  • 代码正在使用 CreateManagedStreamingIngestClient

ManagedStreamingIngestClient 会先尝试流式摄取数据,如果失败几次,就会使用 QueuedClient

如果采集数据较小,4MB以下,推荐使用此客户端。

如果使用QueuedClient,可以试试

.show commands-and-queries | | where StartedOn > ago(20m) and Text contains "{YourTableName}" and CommandType =="DataIngestPull" 

这可以给你执行的命令;但是它可能有 > 5 分钟的延迟

最后,您可以使用您使用的任何客户端检查状态,执行此操作

            StreamDescription description = new StreamDescription
            {
                SourceId = Guid.NewGuid(),
                Stream = dataStream
            };

那么你就有了源码

通过调用这个来摄取:

var checker = await client.IngestFromStreamAsync(description, ingestProps);

之后,调用

var statusCheck = checker.GetIngestionStatusBySourceId(description.sourceId.Value);

您可以了解此摄取作业的状态。最好将它包装在一个单独的线程中,这样您就可以在几秒钟内继续检查一次,例如。