如何在 .net 核心项目中使用 SignalR Core 收听 postgresql 数据库?
How can I listen postgresql database with SignalR Core in .net core project?
我正在开发 .net 核心 Web 应用程序。我想听我的 PostgreSQL 数据库。如果 table 有任何变化,我必须知道。
所以根据我的研究,我必须使用 SignalR Core。我用 SignalR 做了一些示例应用程序,比如聊天应用程序,但其中 none 监听数据库。我找不到任何例子。
-是否必须在PostgreSQL数据库上触发?
-代码端一定要监听吗?
-如何使用SignalR Core?
请教我一个方法。
非常感谢。
I want to listen my PostgreSQL database. And if there are any changes on table, I have to got it.
您可以创建与您指定的 table 关联的触发器,并使用函数 pg_notify(text, text)
发送通知,如下所示。
函数
CREATE OR REPLACE FUNCTION mytestfunc() RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' then
PERFORM pg_notify('notifytesttable', 'new record inserted');
ELSIF TG_OP = 'UPDATE' then
PERFORM pg_notify('notifytesttable', 'updated');
ELSIF TG_OP = 'DELETE' then
PERFORM pg_notify('notifytesttable', 'deleted');
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
触发器
CREATE TRIGGER any_after_testtable
AFTER INSERT OR DELETE OR UPDATE
ON testtable
FOR EACH ROW
EXECUTE PROCEDURE mytestfunc();
在您的客户端应用程序代码中,您可以侦听和接收来自 PostgreSQL 的通知。
conn.Open();
conn.Notification += Conn_Notification;
using (var cmd = new NpgsqlCommand("LISTEN notifytesttable", conn))
{
cmd.ExecuteNonQuery();
}
在 Notification
事件处理程序中,您可以调用 SignalR hub 方法将通知推送到 SignalR 客户端。
private static void Conn_Notification(object sender, NpgsqlNotificationEventArgs e)
{
var notification_payload = e.Payload;
//code logic here
//call hub method to push PostgreSQL notifications that you received to SignalR client users
}
测试结果
有关 PostgreSQL LISTEN 和 NOTIFY 功能的详细信息,您可以查看以下链接。
https://www.postgresql.org/docs/current/sql-notify.html
https://www.npgsql.org/doc/wait.html#processing-of-notifications
这个例子是工作 asp.net 核心 3.0+。完整代码如下。
第 1 步。在 PostgreSql 上创建用于侦听操作的触发器
create trigger any_after_alarm_speed after
insert
or
delete
or
update
on
public.alarm_speed for each row execute procedure alarm_speedf();
步骤 2. 在 Postgresql 上创建过程
CREATE OR REPLACE FUNCTION public.alarm_speedf()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
BEGIN
IF TG_OP = 'INSERT' then
PERFORM pg_notify('notifyalarmspeed', format('INSERT %s %s', NEW.alarm_speed_id,
NEW.alarm_speed_date));
ELSIF TG_OP = 'UPDATE' then
PERFORM pg_notify('notifyalarmspeed', format('UPDATE %s %s', OLD.alarm_speed_id,
OLD.alarm_speed_date));
ELSIF TG_OP = 'DELETE' then
PERFORM pg_notify('notifyalarmspeed', format('DELETE %s %s', OLD.alarm_speed_id,
OLD.alarm_speed_date));
END IF;
RETURN NULL;
END;
$function$;
第 3 步。创建集线器
public class speedalarmhub : Hub
{
private IMemoryCache _cache;
`private IHubContext<speedalarmhub> _hubContext;
public speedalarmhub(IMemoryCache cache, IHubContext<speedalarmhub> hubContext)
{
_cache = cache;
_hubContext = hubContext;
}
public async Task SendMessage()
{
if (!_cache.TryGetValue("SpeedAlarm", out string response))
{
SpeedListener speedlist = new SpeedListener(_hubContext,_cache);
speedlist.ListenForAlarmNotifications();
string jsonspeedalarm = speedlist.GetAlarmList();
_cache.Set("SpeedAlarm", jsonspeedalarm);
await Clients.All.SendAsync("ReceiveMessage", _cache.Get("SpeedAlarm").ToString());
}
else
{
await Clients.All.SendAsync("ReceiveMessage", _cache.Get("SpeedAlarm").ToString());
}
}
}
第 4 步。创建侦听器控制器
public class SpeedListener :Controller
{
private IHubContext<speedalarmhub> _hubContext;
private IMemoryCache _cache;
public SpeedListener(IHubContext<speedalarmhub> hubContext,IMemoryCache cache)
{
_hubContext = hubContext;
_cache = cache;
}
static string GetConnectionString()
{
var csb = new NpgsqlConnectionStringBuilder
{
Host = "yourip",
Database = "yourdatabase",
Username = "yourusername",
Password = "yourpassword",
Port = 5432,
KeepAlive = 30
};
return csb.ConnectionString;
}
public void ListenForAlarmNotifications()
{
NpgsqlConnection conn = new NpgsqlConnection(GetConnectionString());
conn.StateChange += conn_StateChange;
conn.Open();
var listenCommand = conn.CreateCommand();
listenCommand.CommandText = $"listen notifyalarmspeed;";
listenCommand.ExecuteNonQuery();
conn.Notification += PostgresNotificationReceived;
_hubContext.Clients.All.SendAsync(this.GetAlarmList());
while (true)
{
conn.Wait();
}
}
private void PostgresNotificationReceived(object sender, NpgsqlNotificationEventArgs e)
{
string actionName = e.Payload.ToString();
string actionType = "";
if (actionName.Contains("DELETE"))
{
actionType = "Delete";
}
if (actionName.Contains("UPDATE"))
{
actionType = "Update";
}
if (actionName.Contains("INSERT"))
{
actionType = "Insert";
}
_hubContext.Clients.All.SendAsync("ReceiveMessage", this.GetAlarmList());
}
public string GetAlarmList()
{
var AlarmList = new List<AlarmSpeedViewModel>();
using (NpgsqlCommand sqlCmd = new NpgsqlCommand())
{
sqlCmd.CommandType = CommandType.StoredProcedure;
sqlCmd.CommandText = "sp_alarm_speed_process_get";
NpgsqlConnection conn = new NpgsqlConnection(GetConnectionString());
conn.Open();
sqlCmd.Connection = conn;
using (NpgsqlDataReader reader = sqlCmd.ExecuteReader())
{
while (reader.Read())
{
AlarmSpeedViewModel model = new AlarmSpeedViewModel();
model.alarm_speed_id = reader.GetInt32(0);
// you must fill your model items
AlarmList.Add(model);
}
reader.Close();
conn.Close();
}
}
_cache.Set("SpeedAlarm", SerializeObjectToJson(AlarmList));
return _cache.Get("SpeedAlarm").ToString();
}
public String SerializeObjectToJson(Object alarmspeed)
{
try
{
var jss = new JavaScriptSerializer();
return jss.Serialize(alarmspeed);
}
catch (Exception) { return null; }
}
private void conn_StateChange(object sender, System.Data.StateChangeEventArgs e)
{
_hubContext.Clients.All.SendAsync("Current State: " + e.CurrentState.ToString() + " Original State: " + e.OriginalState.ToString(), "connection state changed");
}
}
第 5 步调用集线器
<script src="~/lib/signalr.js"></script>
<script type="text/javascript">
// Start the connection.
var connection = new signalR.HubConnectionBuilder()
.withUrl('/speedalarmhub')
.build();
connection.on('ReceiveMessage', function (message) {
var encodedMsg = message;
// Add the message to the page.
});
// Transport fallback functionality is now built into start.
connection.start()
.then(function () {
console.log('connection started');
connection.invoke('SendMessage');
})
.catch(error => {
console.error(error.message);
});
第 6 步。在启动时添加以下代码配置服务
public void ConfigureServices(IServiceCollection services)
{
services.AddControllersWithViews();
services.AddSignalR();
services.AddMemoryCache();
}
第 7 步. 在 Configure 方法中添加以下代码
app.UseEndpoints(endpoints =>
{
endpoints.MapControllerRoute(
name: "default",
pattern: "{controller=Home}/{action=Index}/{id?}");
endpoints.MapHub<speedalarmhub>("/speedalarmhub");
});
我正在开发 .net 核心 Web 应用程序。我想听我的 PostgreSQL 数据库。如果 table 有任何变化,我必须知道。
所以根据我的研究,我必须使用 SignalR Core。我用 SignalR 做了一些示例应用程序,比如聊天应用程序,但其中 none 监听数据库。我找不到任何例子。
-是否必须在PostgreSQL数据库上触发?
-代码端一定要监听吗?
-如何使用SignalR Core?
请教我一个方法。
非常感谢。
I want to listen my PostgreSQL database. And if there are any changes on table, I have to got it.
您可以创建与您指定的 table 关联的触发器,并使用函数 pg_notify(text, text)
发送通知,如下所示。
函数
CREATE OR REPLACE FUNCTION mytestfunc() RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' then
PERFORM pg_notify('notifytesttable', 'new record inserted');
ELSIF TG_OP = 'UPDATE' then
PERFORM pg_notify('notifytesttable', 'updated');
ELSIF TG_OP = 'DELETE' then
PERFORM pg_notify('notifytesttable', 'deleted');
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
触发器
CREATE TRIGGER any_after_testtable
AFTER INSERT OR DELETE OR UPDATE
ON testtable
FOR EACH ROW
EXECUTE PROCEDURE mytestfunc();
在您的客户端应用程序代码中,您可以侦听和接收来自 PostgreSQL 的通知。
conn.Open();
conn.Notification += Conn_Notification;
using (var cmd = new NpgsqlCommand("LISTEN notifytesttable", conn))
{
cmd.ExecuteNonQuery();
}
在 Notification
事件处理程序中,您可以调用 SignalR hub 方法将通知推送到 SignalR 客户端。
private static void Conn_Notification(object sender, NpgsqlNotificationEventArgs e)
{
var notification_payload = e.Payload;
//code logic here
//call hub method to push PostgreSQL notifications that you received to SignalR client users
}
测试结果
有关 PostgreSQL LISTEN 和 NOTIFY 功能的详细信息,您可以查看以下链接。
https://www.postgresql.org/docs/current/sql-notify.html
https://www.npgsql.org/doc/wait.html#processing-of-notifications
这个例子是工作 asp.net 核心 3.0+。完整代码如下。
第 1 步。在 PostgreSql 上创建用于侦听操作的触发器
create trigger any_after_alarm_speed after
insert
or
delete
or
update
on
public.alarm_speed for each row execute procedure alarm_speedf();
步骤 2. 在 Postgresql 上创建过程
CREATE OR REPLACE FUNCTION public.alarm_speedf()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
BEGIN
IF TG_OP = 'INSERT' then
PERFORM pg_notify('notifyalarmspeed', format('INSERT %s %s', NEW.alarm_speed_id,
NEW.alarm_speed_date));
ELSIF TG_OP = 'UPDATE' then
PERFORM pg_notify('notifyalarmspeed', format('UPDATE %s %s', OLD.alarm_speed_id,
OLD.alarm_speed_date));
ELSIF TG_OP = 'DELETE' then
PERFORM pg_notify('notifyalarmspeed', format('DELETE %s %s', OLD.alarm_speed_id,
OLD.alarm_speed_date));
END IF;
RETURN NULL;
END;
$function$;
第 3 步。创建集线器
public class speedalarmhub : Hub
{
private IMemoryCache _cache;
`private IHubContext<speedalarmhub> _hubContext;
public speedalarmhub(IMemoryCache cache, IHubContext<speedalarmhub> hubContext)
{
_cache = cache;
_hubContext = hubContext;
}
public async Task SendMessage()
{
if (!_cache.TryGetValue("SpeedAlarm", out string response))
{
SpeedListener speedlist = new SpeedListener(_hubContext,_cache);
speedlist.ListenForAlarmNotifications();
string jsonspeedalarm = speedlist.GetAlarmList();
_cache.Set("SpeedAlarm", jsonspeedalarm);
await Clients.All.SendAsync("ReceiveMessage", _cache.Get("SpeedAlarm").ToString());
}
else
{
await Clients.All.SendAsync("ReceiveMessage", _cache.Get("SpeedAlarm").ToString());
}
}
}
第 4 步。创建侦听器控制器
public class SpeedListener :Controller
{
private IHubContext<speedalarmhub> _hubContext;
private IMemoryCache _cache;
public SpeedListener(IHubContext<speedalarmhub> hubContext,IMemoryCache cache)
{
_hubContext = hubContext;
_cache = cache;
}
static string GetConnectionString()
{
var csb = new NpgsqlConnectionStringBuilder
{
Host = "yourip",
Database = "yourdatabase",
Username = "yourusername",
Password = "yourpassword",
Port = 5432,
KeepAlive = 30
};
return csb.ConnectionString;
}
public void ListenForAlarmNotifications()
{
NpgsqlConnection conn = new NpgsqlConnection(GetConnectionString());
conn.StateChange += conn_StateChange;
conn.Open();
var listenCommand = conn.CreateCommand();
listenCommand.CommandText = $"listen notifyalarmspeed;";
listenCommand.ExecuteNonQuery();
conn.Notification += PostgresNotificationReceived;
_hubContext.Clients.All.SendAsync(this.GetAlarmList());
while (true)
{
conn.Wait();
}
}
private void PostgresNotificationReceived(object sender, NpgsqlNotificationEventArgs e)
{
string actionName = e.Payload.ToString();
string actionType = "";
if (actionName.Contains("DELETE"))
{
actionType = "Delete";
}
if (actionName.Contains("UPDATE"))
{
actionType = "Update";
}
if (actionName.Contains("INSERT"))
{
actionType = "Insert";
}
_hubContext.Clients.All.SendAsync("ReceiveMessage", this.GetAlarmList());
}
public string GetAlarmList()
{
var AlarmList = new List<AlarmSpeedViewModel>();
using (NpgsqlCommand sqlCmd = new NpgsqlCommand())
{
sqlCmd.CommandType = CommandType.StoredProcedure;
sqlCmd.CommandText = "sp_alarm_speed_process_get";
NpgsqlConnection conn = new NpgsqlConnection(GetConnectionString());
conn.Open();
sqlCmd.Connection = conn;
using (NpgsqlDataReader reader = sqlCmd.ExecuteReader())
{
while (reader.Read())
{
AlarmSpeedViewModel model = new AlarmSpeedViewModel();
model.alarm_speed_id = reader.GetInt32(0);
// you must fill your model items
AlarmList.Add(model);
}
reader.Close();
conn.Close();
}
}
_cache.Set("SpeedAlarm", SerializeObjectToJson(AlarmList));
return _cache.Get("SpeedAlarm").ToString();
}
public String SerializeObjectToJson(Object alarmspeed)
{
try
{
var jss = new JavaScriptSerializer();
return jss.Serialize(alarmspeed);
}
catch (Exception) { return null; }
}
private void conn_StateChange(object sender, System.Data.StateChangeEventArgs e)
{
_hubContext.Clients.All.SendAsync("Current State: " + e.CurrentState.ToString() + " Original State: " + e.OriginalState.ToString(), "connection state changed");
}
}
第 5 步调用集线器
<script src="~/lib/signalr.js"></script>
<script type="text/javascript">
// Start the connection.
var connection = new signalR.HubConnectionBuilder()
.withUrl('/speedalarmhub')
.build();
connection.on('ReceiveMessage', function (message) {
var encodedMsg = message;
// Add the message to the page.
});
// Transport fallback functionality is now built into start.
connection.start()
.then(function () {
console.log('connection started');
connection.invoke('SendMessage');
})
.catch(error => {
console.error(error.message);
});
第 6 步。在启动时添加以下代码配置服务
public void ConfigureServices(IServiceCollection services)
{
services.AddControllersWithViews();
services.AddSignalR();
services.AddMemoryCache();
}
第 7 步. 在 Configure 方法中添加以下代码
app.UseEndpoints(endpoints =>
{
endpoints.MapControllerRoute(
name: "default",
pattern: "{controller=Home}/{action=Index}/{id?}");
endpoints.MapHub<speedalarmhub>("/speedalarmhub");
});