Azure 函数到 Table 存储
Azure Function into Table Storage
我有一个 Azure 函数,我想让它从 EventHub 获取消息(这非常简单并且有效),然后在运行时使用 Table 绑定将该信息放入 Table 存储.
这是我目前的情况:
public static async Task Run(string eventHubMessage, TraceWriter log, Binder binder)
{
var m = JsonConvert.DeserializeObject<Measurement>(eventHubMessage);
var attributes = new Attribute[]
{
new StorageAccountAttribute("AzureWebJobsTest"),
new TableAttribute(tableName, m.PartitionKey, m.RowKey)
};
using(var output = await binder.BindAsync<MyTableEntity>(attributes))
{
if(output == null)
log.Info($"4. output is null");
else
{
output.Minimum = m.Minimum;
output.Maximum = m.Maximum;
output.Average = m.Average;
output.Timestamp = m.Timestamp;
output.ETag = m.ETag;
output.WriteEntity(/* Need an operationContext*/)
}
}
}
public class MyTableEntity : TableEntity, IDisposable
{
public double Average { get; set;}
public double Minimum { get; set;}
public double Maximum { get; set;}
bool disposed = false;
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposed)
return;
if (disposing)
{
}
disposed = true;
}
}
我的问题;
1) 输出总是 null。
2) 即使输出不为空,我也不知道 OperationContext 需要什么,或者调用 ITableEntity.Write() 是否是让它写入 [=31 的正确方法=] 存储.
预计到达时间Json绑定:
{
"bindings": [
{
"type": "eventHubTrigger",
"name": "eventHubMessage",
"direction": "in",
"path": "measurements",
"connection": "MeasurementsConnectionString"
}
],
"disabled": false
}
要向 Table 添加新条目,您应该绑定到 IAsyncCollector
而不是实体本身,然后创建一个新实体并调用 AddAsync
。以下代码段对我有用:
var attributes = new Attribute[]
{
new StorageAccountAttribute("..."),
new TableAttribute("...")
};
var output = await binder.BindAsync<IAsyncCollector<MyTableEntity>>(attributes);
await output.AddAsync(new MyTableEntity()
{
PartitionKey = "...",
RowKey = "...",
Minimum = ...,
...
});
如果您想使用 DynamicTableEntity,因为在编译时您不知道消息中将包含哪些数据,并且您意识到 table 绑定不再适用于 DynamicTableEntities,您可以使用以下方法:
private static async Task ProcessMessage(string message, DateTime enqueuedTime)
{
var deviceData = JsonConvert.DeserializeObject<JObject>(message);
var dynamicTableEntity = new DynamicTableEntity();
dynamicTableEntity.RowKey = enqueuedTime.ToString("yyyy-MM-dd HH:mm:ss.fff");
foreach (KeyValuePair<string, JToken> keyValuePair in deviceData)
{
if (keyValuePair.Key.Equals("MyPartitionKey"))
{
dynamicTableEntity.PartitionKey = keyValuePair.Value.ToString();
}
else if (keyValuePair.Key.Equals("Timestamp")) // if you are using a parameter "Timestamp" it has to be stored in a column named differently because the column "Timestamp" will automatically be filled when adding a line to table storage
{
dynamicTableEntity.Properties.Add("MyTimestamp", EntityProperty.CreateEntityPropertyFromObject(keyValuePair.Value));
}
else
{
dynamicTableEntity.Properties.Add(keyValuePair.Key, EntityProperty.CreateEntityPropertyFromObject(keyValuePair.Value));
}
}
CloudStorageAccount storageAccount = CloudStorageAccount.Parse("myStorageConnectionString");
CloudTableClient tableClient = storageAccount.CreateCloudTableClient();
CloudTable table = tableClient.GetTableReference("myTableName");
table.CreateIfNotExists();
var tableOperation = TableOperation.Insert(dynamicTableEntity);
await table.ExecuteAsync(tableOperation);
}
我有一个 Azure 函数,我想让它从 EventHub 获取消息(这非常简单并且有效),然后在运行时使用 Table 绑定将该信息放入 Table 存储.
这是我目前的情况:
public static async Task Run(string eventHubMessage, TraceWriter log, Binder binder)
{
var m = JsonConvert.DeserializeObject<Measurement>(eventHubMessage);
var attributes = new Attribute[]
{
new StorageAccountAttribute("AzureWebJobsTest"),
new TableAttribute(tableName, m.PartitionKey, m.RowKey)
};
using(var output = await binder.BindAsync<MyTableEntity>(attributes))
{
if(output == null)
log.Info($"4. output is null");
else
{
output.Minimum = m.Minimum;
output.Maximum = m.Maximum;
output.Average = m.Average;
output.Timestamp = m.Timestamp;
output.ETag = m.ETag;
output.WriteEntity(/* Need an operationContext*/)
}
}
}
public class MyTableEntity : TableEntity, IDisposable
{
public double Average { get; set;}
public double Minimum { get; set;}
public double Maximum { get; set;}
bool disposed = false;
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposed)
return;
if (disposing)
{
}
disposed = true;
}
}
我的问题;
1) 输出总是 null。
2) 即使输出不为空,我也不知道 OperationContext 需要什么,或者调用 ITableEntity.Write() 是否是让它写入 [=31 的正确方法=] 存储.
预计到达时间Json绑定:
{
"bindings": [
{
"type": "eventHubTrigger",
"name": "eventHubMessage",
"direction": "in",
"path": "measurements",
"connection": "MeasurementsConnectionString"
}
],
"disabled": false
}
要向 Table 添加新条目,您应该绑定到 IAsyncCollector
而不是实体本身,然后创建一个新实体并调用 AddAsync
。以下代码段对我有用:
var attributes = new Attribute[]
{
new StorageAccountAttribute("..."),
new TableAttribute("...")
};
var output = await binder.BindAsync<IAsyncCollector<MyTableEntity>>(attributes);
await output.AddAsync(new MyTableEntity()
{
PartitionKey = "...",
RowKey = "...",
Minimum = ...,
...
});
如果您想使用 DynamicTableEntity,因为在编译时您不知道消息中将包含哪些数据,并且您意识到 table 绑定不再适用于 DynamicTableEntities,您可以使用以下方法:
private static async Task ProcessMessage(string message, DateTime enqueuedTime)
{
var deviceData = JsonConvert.DeserializeObject<JObject>(message);
var dynamicTableEntity = new DynamicTableEntity();
dynamicTableEntity.RowKey = enqueuedTime.ToString("yyyy-MM-dd HH:mm:ss.fff");
foreach (KeyValuePair<string, JToken> keyValuePair in deviceData)
{
if (keyValuePair.Key.Equals("MyPartitionKey"))
{
dynamicTableEntity.PartitionKey = keyValuePair.Value.ToString();
}
else if (keyValuePair.Key.Equals("Timestamp")) // if you are using a parameter "Timestamp" it has to be stored in a column named differently because the column "Timestamp" will automatically be filled when adding a line to table storage
{
dynamicTableEntity.Properties.Add("MyTimestamp", EntityProperty.CreateEntityPropertyFromObject(keyValuePair.Value));
}
else
{
dynamicTableEntity.Properties.Add(keyValuePair.Key, EntityProperty.CreateEntityPropertyFromObject(keyValuePair.Value));
}
}
CloudStorageAccount storageAccount = CloudStorageAccount.Parse("myStorageConnectionString");
CloudTableClient tableClient = storageAccount.CreateCloudTableClient();
CloudTable table = tableClient.GetTableReference("myTableName");
table.CreateIfNotExists();
var tableOperation = TableOperation.Insert(dynamicTableEntity);
await table.ExecuteAsync(tableOperation);
}