IngestFromStreamAsync 方法不起作用
IngestFromStreamAsync method does not work
我设法使用以下代码成功摄取数据
var kcsbDM = new KustoConnectionStringBuilder("https://test123.southeastasia.kusto.windows.net", "testdb").WithAadApplicationTokenAuthentication(acquireTokenTask.AccessToken);
使用 (var ingestClient = KustoIngestFactory.CreateDirectIngestClient(kcsbDM))
{
var ingestProps = new KustoQueuedIngestionProperties("testdb", "TraceLog");
ingestProps.ReportLevel = IngestionReportLevel.FailuresOnly;
ingestProps.ReportMethod = IngestionReportMethod.Queue;
ingestProps.Format = DataSourceFormat.json;
//generate datastream and columnmapping
ingestProps.IngestionMapping = new IngestionMapping() { IngestionMappings = columnMappings };
var ingestionResult = ingestClient.IngestFromStream(memStream, ingestProps);
}
当我尝试使用 QueuedClient 和 IngestFromStreamAsync 时,代码已成功执行,但即使在 30 分钟后也没有任何数据被摄取到数据库中
var kcsbDM = new KustoConnectionStringBuilder("https://ingest-test123.southeastasia.kusto.windows.net", "testdb").WithAadApplicationTokenAuthentication(acquireTokenTask.AccessToken);
使用 (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(kcsbDM))
{
var ingestProps = new KustoQueuedIngestionProperties("testdb", "TraceLog");
ingestProps.ReportLevel = IngestionReportLevel.FailuresOnly;
ingestProps.ReportMethod = IngestionReportMethod.Queue;
ingestProps.Format = DataSourceFormat.json;
//generate datastream and columnmapping
ingestProps.IngestionMapping = new IngestionMapping() { IngestionMappings = columnMappings };
var ingestionResult = ingestClient.IngestFromStreamAsync(memStream, ingestProps);
}
尝试运行。显示“https://test123.southeastasia.kusto.windows.net”端点上的摄取失败,查看是否存在摄取错误。
另外,你设置了Queue reporting method,你可以通过从queue中读取得到详细的结果。
ingestProps.ReportLevel = IngestionReportLevel.FailuresOnly;
ingestProps.ReportMethod = IngestionReportMethod.Queue;
(在您使用 KustoQueuedIngestionProperties
的第一个示例中,您应该使用 KustoIngestionProperties
。KustoQueuedIngestionProperties
具有将被摄取客户端忽略的其他属性,例如 ReportLevel 和 ReportMethod)
能否将这行更改为:
var ingestionResult = await ingestClient.IngestFromStreamAsync(memStream, ingestProps);
另请注意,排队摄取在实际摄取数据之前有一个最多 5 分钟的批处理阶段:
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/management/batchingpolicy
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/management/batching-policy
我终于找到原因了,需要在 table:
中启用流摄取
.alter table TraceLog policy streamingingestion enable
实际上只有在
的情况下才需要启用流式传输策略
- 集群中的流摄取已打开(Azure 门户)
- 代码正在使用
CreateManagedStreamingIngestClient
ManagedStreamingIngestClient 会先尝试流式摄取数据,如果失败几次,就会使用 QueuedClient
如果采集数据较小,4MB以下,推荐使用此客户端。
如果使用QueuedClient,可以试试
.show commands-and-queries | | where StartedOn > ago(20m) and Text contains "{YourTableName}" and CommandType =="DataIngestPull"
这可以给你执行的命令;但是它可能有 > 5 分钟的延迟
最后,您可以使用您使用的任何客户端检查状态,执行此操作
StreamDescription description = new StreamDescription
{
SourceId = Guid.NewGuid(),
Stream = dataStream
};
那么你就有了源码
通过调用这个来摄取:
var checker = await client.IngestFromStreamAsync(description, ingestProps);
之后,调用
var statusCheck = checker.GetIngestionStatusBySourceId(description.sourceId.Value);
您可以了解此摄取作业的状态。最好将它包装在一个单独的线程中,这样您就可以在几秒钟内继续检查一次,例如。
我设法使用以下代码成功摄取数据
var kcsbDM = new KustoConnectionStringBuilder("https://test123.southeastasia.kusto.windows.net", "testdb").WithAadApplicationTokenAuthentication(acquireTokenTask.AccessToken);
使用 (var ingestClient = KustoIngestFactory.CreateDirectIngestClient(kcsbDM))
{
var ingestProps = new KustoQueuedIngestionProperties("testdb", "TraceLog");
ingestProps.ReportLevel = IngestionReportLevel.FailuresOnly;
ingestProps.ReportMethod = IngestionReportMethod.Queue;
ingestProps.Format = DataSourceFormat.json;
//generate datastream and columnmapping
ingestProps.IngestionMapping = new IngestionMapping() { IngestionMappings = columnMappings };
var ingestionResult = ingestClient.IngestFromStream(memStream, ingestProps);
}
当我尝试使用 QueuedClient 和 IngestFromStreamAsync 时,代码已成功执行,但即使在 30 分钟后也没有任何数据被摄取到数据库中
var kcsbDM = new KustoConnectionStringBuilder("https://ingest-test123.southeastasia.kusto.windows.net", "testdb").WithAadApplicationTokenAuthentication(acquireTokenTask.AccessToken);
使用 (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(kcsbDM))
{
var ingestProps = new KustoQueuedIngestionProperties("testdb", "TraceLog");
ingestProps.ReportLevel = IngestionReportLevel.FailuresOnly;
ingestProps.ReportMethod = IngestionReportMethod.Queue;
ingestProps.Format = DataSourceFormat.json;
//generate datastream and columnmapping
ingestProps.IngestionMapping = new IngestionMapping() { IngestionMappings = columnMappings };
var ingestionResult = ingestClient.IngestFromStreamAsync(memStream, ingestProps);
}
尝试运行。显示“https://test123.southeastasia.kusto.windows.net”端点上的摄取失败,查看是否存在摄取错误。 另外,你设置了Queue reporting method,你可以通过从queue中读取得到详细的结果。
ingestProps.ReportLevel = IngestionReportLevel.FailuresOnly;
ingestProps.ReportMethod = IngestionReportMethod.Queue;
(在您使用 KustoQueuedIngestionProperties
的第一个示例中,您应该使用 KustoIngestionProperties
。KustoQueuedIngestionProperties
具有将被摄取客户端忽略的其他属性,例如 ReportLevel 和 ReportMethod)
能否将这行更改为: var ingestionResult = await ingestClient.IngestFromStreamAsync(memStream, ingestProps);
另请注意,排队摄取在实际摄取数据之前有一个最多 5 分钟的批处理阶段: https://docs.microsoft.com/en-us/azure/data-explorer/kusto/management/batchingpolicy https://docs.microsoft.com/en-us/azure/data-explorer/kusto/management/batching-policy
我终于找到原因了,需要在 table:
中启用流摄取.alter table TraceLog policy streamingingestion enable
实际上只有在
的情况下才需要启用流式传输策略- 集群中的流摄取已打开(Azure 门户)
- 代码正在使用
CreateManagedStreamingIngestClient
ManagedStreamingIngestClient 会先尝试流式摄取数据,如果失败几次,就会使用 QueuedClient
如果采集数据较小,4MB以下,推荐使用此客户端。
如果使用QueuedClient,可以试试
.show commands-and-queries | | where StartedOn > ago(20m) and Text contains "{YourTableName}" and CommandType =="DataIngestPull"
这可以给你执行的命令;但是它可能有 > 5 分钟的延迟
最后,您可以使用您使用的任何客户端检查状态,执行此操作
StreamDescription description = new StreamDescription
{
SourceId = Guid.NewGuid(),
Stream = dataStream
};
那么你就有了源码
通过调用这个来摄取:
var checker = await client.IngestFromStreamAsync(description, ingestProps);
之后,调用
var statusCheck = checker.GetIngestionStatusBySourceId(description.sourceId.Value);
您可以了解此摄取作业的状态。最好将它包装在一个单独的线程中,这样您就可以在几秒钟内继续检查一次,例如。