insert/update 在 sql table 中按流分析条件从 iothub 引用数据

insert/update referenced data from iothub in sql table by conditions with stream analytics

我在 Azure 上工作,有一个 sql 事务数据库,有两个 table 和一个从多个设备接收数据的 iotHub。我必须检查数据当前是否存在于数据库中,如果不存在则存储它们,否则更新 table.

中的旧数据

table一个赞:

id   |   key1
-------------
1    |   abc
2    |   def
3    |   ghi

table两个赞:

id    |   id_tableOne  |   key2         |   something
-------------------------------------------------------
77    |   2            |   Emil         |   welcome
78    |   1            |   Emil         |   here I am
79    |   1            |   Hans         |   hello world

流为来自 iotHub 的 json 消息,如下所示:

{
    "topic": "test",
    "key1": "ghi",
    "data": [{
        "key2": "Emil",
        "something": "lmn"
    },
    {
        "key2": "Hans",
        "something": "hij"
    },
    {
        "key2": "Gerda",
        "something": "xyz"
    }]
}
  1. 我想从 tableOne by "key1" 在 json 流
  2. 中获取 ID
  3. 我想检查 id_tableOne(1. 的结果)和 key2 的组合是否存在于 table两个
  4. 如果确实存在:更新 table中的行 - 否则:在中插入新行 table两个

基于 Stream data as input into Stream Analytics,azure stream anslytics inputs.So 仅支持 eventhubs,iothubs and blob storage,您无法使用输出 sql 数据库筛选条件。禁止如下sql。

SELECT 
jaysqlserver2.id as id,
jaysqlserver2.id_tableOne as idTableOne,
jaysqlserver2.key2 as key2,
jaysqlserver2.somthing as something
from jsoninput
where jaysqlserver2.id_tableOne = jsoninput.key1 

但是,我为您提供了一个解决方法。

首先,您可以压缩 jsoninput 并将它们保存到目标 sql 数据库中的临时 table。

SELECT 
    jsoninput.key1, 
    arrayElement.ArrayValue.key2,
    arrayElement.ArrayValue.something
INTO 
    output
FROM jsoninput
CROSS APPLY GetArrayElements(jsoninput.data) AS arrayElement

然后,您可以按照doc使用Azure Function Time Trigger自动操作sql数据库。请参考伪代码如下:

#r "System.Configuration"
#r "System.Data"
using System.Net;
using System.Configuration;
using System.Data.SqlClient;
using System.Threading.Tasks;

public static async Task<HttpResponseMessage> Run(HttpRequestMessage req, TraceWriter log)
{
    var str = ConfigurationManager.ConnectionStrings["sqldb_connection"].ConnectionString;
    using (SqlConnection conn = new SqlConnection(str))
    {
        conn.Open();
        var text = "select temp.key1 as key1temp,temp.key2 as key2temp,
t1.id as id1,t1.key1 as key1,
t2.id as id2,t2.id_tableOne as tableOne,t2.key2 as key2,t2.something as something
from dbo.tabletemp as temp
left join dbo.table1 as t1 on temp.key1 = t1.key1
left join dbo.table2 as t2 on t1.id = t2.id_tableOne and temp.key2 = t2.key2
where t1.id is not null";

        SqlCommand sqlComm = new SqlCommand(text, conn);

        SqlDataReader reader = sqlComm.ExecuteReader();

        while(reader.Read())
        {
            string id2 = reader["id2"].ToString();
            if(id2 == null)
            {
                //execute insert sql
            }
            else
            {
                //execute update sql
            }
        }

        var delSql = "delete from dbo.temp";
        //execute delete data in temp table
        ...
    }
}

希望对您有所帮助you.Any关注,请告知。