insert/update 在 sql table 中按流分析条件从 iothub 引用数据
insert/update referenced data from iothub in sql table by conditions with stream analytics
我在 Azure 上工作,有一个 sql 事务数据库,有两个 table 和一个从多个设备接收数据的 iotHub。我必须检查数据当前是否存在于数据库中,如果不存在则存储它们,否则更新 table.
中的旧数据
table一个赞:
id | key1
-------------
1 | abc
2 | def
3 | ghi
table两个赞:
id | id_tableOne | key2 | something
-------------------------------------------------------
77 | 2 | Emil | welcome
78 | 1 | Emil | here I am
79 | 1 | Hans | hello world
流为来自 iotHub 的 json 消息,如下所示:
{
"topic": "test",
"key1": "ghi",
"data": [{
"key2": "Emil",
"something": "lmn"
},
{
"key2": "Hans",
"something": "hij"
},
{
"key2": "Gerda",
"something": "xyz"
}]
}
- 我想从 tableOne by "key1" 在 json 流
中获取 ID
- 我想检查 id_tableOne(1. 的结果)和 key2 的组合是否存在于
table两个
- 如果确实存在:更新 table中的行 - 否则:在中插入新行
table两个
基于 Stream data as input into Stream Analytics,azure stream anslytics inputs.So 仅支持 eventhubs,iothubs and blob storage
,您无法使用输出 sql 数据库筛选条件。禁止如下sql。
SELECT
jaysqlserver2.id as id,
jaysqlserver2.id_tableOne as idTableOne,
jaysqlserver2.key2 as key2,
jaysqlserver2.somthing as something
from jsoninput
where jaysqlserver2.id_tableOne = jsoninput.key1
但是,我为您提供了一个解决方法。
首先,您可以压缩 jsoninput
并将它们保存到目标 sql 数据库中的临时 table。
SELECT
jsoninput.key1,
arrayElement.ArrayValue.key2,
arrayElement.ArrayValue.something
INTO
output
FROM jsoninput
CROSS APPLY GetArrayElements(jsoninput.data) AS arrayElement
然后,您可以按照doc使用Azure Function Time Trigger自动操作sql数据库。请参考伪代码如下:
#r "System.Configuration"
#r "System.Data"
using System.Net;
using System.Configuration;
using System.Data.SqlClient;
using System.Threading.Tasks;
public static async Task<HttpResponseMessage> Run(HttpRequestMessage req, TraceWriter log)
{
var str = ConfigurationManager.ConnectionStrings["sqldb_connection"].ConnectionString;
using (SqlConnection conn = new SqlConnection(str))
{
conn.Open();
var text = "select temp.key1 as key1temp,temp.key2 as key2temp,
t1.id as id1,t1.key1 as key1,
t2.id as id2,t2.id_tableOne as tableOne,t2.key2 as key2,t2.something as something
from dbo.tabletemp as temp
left join dbo.table1 as t1 on temp.key1 = t1.key1
left join dbo.table2 as t2 on t1.id = t2.id_tableOne and temp.key2 = t2.key2
where t1.id is not null";
SqlCommand sqlComm = new SqlCommand(text, conn);
SqlDataReader reader = sqlComm.ExecuteReader();
while(reader.Read())
{
string id2 = reader["id2"].ToString();
if(id2 == null)
{
//execute insert sql
}
else
{
//execute update sql
}
}
var delSql = "delete from dbo.temp";
//execute delete data in temp table
...
}
}
希望对您有所帮助you.Any关注,请告知。
我在 Azure 上工作,有一个 sql 事务数据库,有两个 table 和一个从多个设备接收数据的 iotHub。我必须检查数据当前是否存在于数据库中,如果不存在则存储它们,否则更新 table.
中的旧数据table一个赞:
id | key1
-------------
1 | abc
2 | def
3 | ghi
table两个赞:
id | id_tableOne | key2 | something
-------------------------------------------------------
77 | 2 | Emil | welcome
78 | 1 | Emil | here I am
79 | 1 | Hans | hello world
流为来自 iotHub 的 json 消息,如下所示:
{
"topic": "test",
"key1": "ghi",
"data": [{
"key2": "Emil",
"something": "lmn"
},
{
"key2": "Hans",
"something": "hij"
},
{
"key2": "Gerda",
"something": "xyz"
}]
}
- 我想从 tableOne by "key1" 在 json 流 中获取 ID
- 我想检查 id_tableOne(1. 的结果)和 key2 的组合是否存在于 table两个
- 如果确实存在:更新 table中的行 - 否则:在中插入新行 table两个
基于 Stream data as input into Stream Analytics,azure stream anslytics inputs.So 仅支持 eventhubs,iothubs and blob storage
,您无法使用输出 sql 数据库筛选条件。禁止如下sql。
SELECT
jaysqlserver2.id as id,
jaysqlserver2.id_tableOne as idTableOne,
jaysqlserver2.key2 as key2,
jaysqlserver2.somthing as something
from jsoninput
where jaysqlserver2.id_tableOne = jsoninput.key1
但是,我为您提供了一个解决方法。
首先,您可以压缩 jsoninput
并将它们保存到目标 sql 数据库中的临时 table。
SELECT
jsoninput.key1,
arrayElement.ArrayValue.key2,
arrayElement.ArrayValue.something
INTO
output
FROM jsoninput
CROSS APPLY GetArrayElements(jsoninput.data) AS arrayElement
然后,您可以按照doc使用Azure Function Time Trigger自动操作sql数据库。请参考伪代码如下:
#r "System.Configuration"
#r "System.Data"
using System.Net;
using System.Configuration;
using System.Data.SqlClient;
using System.Threading.Tasks;
public static async Task<HttpResponseMessage> Run(HttpRequestMessage req, TraceWriter log)
{
var str = ConfigurationManager.ConnectionStrings["sqldb_connection"].ConnectionString;
using (SqlConnection conn = new SqlConnection(str))
{
conn.Open();
var text = "select temp.key1 as key1temp,temp.key2 as key2temp,
t1.id as id1,t1.key1 as key1,
t2.id as id2,t2.id_tableOne as tableOne,t2.key2 as key2,t2.something as something
from dbo.tabletemp as temp
left join dbo.table1 as t1 on temp.key1 = t1.key1
left join dbo.table2 as t2 on t1.id = t2.id_tableOne and temp.key2 = t2.key2
where t1.id is not null";
SqlCommand sqlComm = new SqlCommand(text, conn);
SqlDataReader reader = sqlComm.ExecuteReader();
while(reader.Read())
{
string id2 = reader["id2"].ToString();
if(id2 == null)
{
//execute insert sql
}
else
{
//execute update sql
}
}
var delSql = "delete from dbo.temp";
//execute delete data in temp table
...
}
}
希望对您有所帮助you.Any关注,请告知。