在 .Net 中使用 GraphQL 客户端库的 AWS Appsync 实施
AWS Appsync implementation using GraphQL-client library in .Net
我正在尝试实施类似于此 python 示例的应用程序同步订阅,但在 .net https://aws.amazon.com/blogs/mobile/appsync-websockets-python/
中
我开始使用 nuget 包 GraphQL.Client https://www.nuget.org/packages/GraphQL.Client
Query/Mutation 的执行工作正常,就像 https://github.com/graphql-dotnet/graphql-client 的自述文件中给出的那样
但是订阅不起作用。
我的代码使用GraphQL.Client:
using var graphQLClient = new GraphQLHttpClient("https://<MY-API-PATH>.appsync-realtime-api.<AWS-region>.amazonaws.com/graphql", new NewtonsoftJsonSerializer());
graphQLClient.HttpClient.DefaultRequestHeaders.Add("host", "<API HOST without https or absolute path and 'realtime-' text in the api address>"); //As given in the python example
graphQLClient.HttpClient.DefaultRequestHeaders.Add("x-api-key", "<API KEY>");
var req= new GraphQLRequest
{
Query = @"subscription SubscribeToEventComments{ subscribeToEventComments(eventId: 'test'){ content }}",
Variables = new{}
};
IObservable<GraphQLResponse<Response>> subscriptionStream = graphQLClient.CreateSubscriptionStream<Response>(req, (Exception ex) =>
{
Console.WriteLine("Error: {0}", ex.ToString());
});
var subscription = subscriptionStream.Subscribe(response =>
{
Console.WriteLine($"Response'{Newtonsoft.Json.JsonConvert.SerializeObject(response)}' ");
},
ex =>
{
Console.WriteLine("Error{0}", ex.ToString());
});
它给出了例外 "The remote party closed the WebSocket connection without completing the close handshake."
堆栈跟踪:
在 System.Net.WebSockets.ManagedWebSocket.d__662.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter
1.GetResult()
在 GraphQL.Client.Http.Websocket.GraphQLHttpWebSocket.d__40.MoveNext() 在 C:\Users\UserName\Source\repos\graphql-client\src\GraphQL.Client\Websocket\GraphQLHttpWebSocket.cs:line 546
然后我在没有这个 nuget 的情况下尝试使用标准的 websocket
没有 nuget 的代码:
static public async Task CallWebsocket()
{
try
{
_client = new ClientWebSocket();
_client.Options.AddSubProtocol("graphql-ws");
_client.Options.SetRequestHeader("host", "<HOST URL without wss but now with 'realtime' text in api url because otherwise we are getting SSL error>");
_client.Options.SetRequestHeader("x-api-key", "<API KEY>");
await _client.ConnectAsync(new Uri("https://<MY-APPSYNC_API_PATH>.appsync-realtime-api.<AWS-region>.amazonaws.com/graphql"), CancellationToken.None);
await SendCommand();
var docList = await Receive();
}
catch(Exception ex)
{
}
}
static private async Task SendCommand()
{
ArraySegment<byte> outputBuffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes("'query' : 'subscription SubscribeToEventComments{ subscribeToEventComments(eventId: 'test'){ content }}'"));
await _client.SendAsync(outputBuffer, WebSocketMessageType.Text, true, CancellationToken.None);
}
static private async Task<string> Receive()
{
var receiveBufferSize = 1536;
byte[] buffer = new byte[receiveBufferSize];
var result = await _client.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
var resultJson = (new UTF8Encoding()).GetString(buffer);
return resultJson;
}
我遇到以下异常:
内部异常:"An established connection was aborted by the software in your host machine."
内部异常消息:"Unable to read data from the transport connection: An established connection was aborted by the software in your host machine.."
留言:"The remote party closed the WebSocket connection without completing the close handshake."
任何人都可以帮助正确实施。
Nuget 不能与 AppSync 订阅开箱即用,因此您需要为此编写自己的客户端代码,就像您在第二个(非 nuget)示例中尝试的那样。
现在,对于第二个示例,请再次查看问题中引用的 python example。有几个步骤未包含在您的代码中。我将列举所需的步骤并尝试将它们从 python 代码移植到 C#(请注意,我手头没有 C# 环境,因此可能存在语法错误,但这段代码应该非常接近你需要)
步骤 0 - AppSync 端点
假设为您的 API 调用 aws appsync get-graphql-api --api-id example123456
的结果是:
{
"graphqlApi": {
"name": "myNewRealTimeGraphQL-API",
"authenticationType": "<API_KEY>",
"tags": {},
"apiId": "example123456",
"uris": {
"GRAPHQL": "https://abc.appsync-api.us-west-2.amazonaws.com/graphql",
"REALTIME": "wss://abc.appsync-realtime-api.us-west-2.amazonaws.com/graphql"
},
"arn": "arn:aws:appsync:us-west-2: xxxxxxxxxxxx:apis/xxxxxxxxxxxx"
}
}
第 1 步 - 建立连接 URL
第 2 步 - 连接到 WebSocket 端点
这包括根据 python 文章
中提到的协议发送 connection_init 消息
第 3 步 - 根据协议等待 connection_ack
同样,这是根据协议
第 4 步 - 注册订阅
第 5 步 - 发送突变
该步骤不在本回复中,但可以通过 AWS 控制台完成
第 6 步 - 等待“数据”消息
这些是 AppSync 发送的实时事件
第 7 步 - 取消注册订阅
第 8 步 - 断开连接
// These are declared at the same level as your _client
// This comes from the graphqlApi.uris.GRAPHQL in step 0, set as a var here for clarity
_gqlHost = "abc.appsync-api.us-west-2.amazonaws.com";
// This comes from the graphqlApi.uris.REALTIME in step 0, set as a var here for clarity
_realtimeUri = "wss://abc.appsync-realtime-api.us-west-2.amazonaws.com/graphql";
_apiKey = "<API KEY>";
static public async Task CallWebsocket()
{
// Step 1
// This is JSON needed by the server, it will be converted to base64
// (note: might be better to use something like Json.NET for this task)
var header = var test = $@"{{
""host"":""{_gqlHost}"",
""x-api-key"": ""{_apiKey}""
}}";
// Now we need to encode the previous JSON to base64
var headerB64 = System.Convert.ToBase64String(
System.Text.Encoding.UTF8.GetBytes(header));
UriBuilder connectionUriBuilder = new UriBuilder(_realtimeUri);
connectionUriBuilder.Query = $"header={headerB64}&payload=e30=";
try
{
_client = new ClientWebSocket();
_client.Options.AddSubProtocol("graphql-ws");
// Step 2
await _client.ConnectAsync(connectionUriBuilder.Uri), CancellationToken.None);
// Step 3
await SendConnectionInit();
await Receive();
}
catch(Exception ex)
{
}
}
static private async Task SendConnectionInit()
{
ArraySegment<byte> outputBuffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes(@"{""type"": ""connection_init""}"));
await _client.SendAsync(outputBuffer, WebSocketMessageType.Text, true, CancellationToken.None);
}
static private async Task SendSubscription()
{
// This detail is important, note that the subscription is a stringified JSON that will be embeded in the "data" field below
var subscription = $@"{{\""query\"": \""subscription SubscribeToEventComments{{ subscribeToEventComments{{ content }} }}\"", \""variables\"": {{}} }}";
var register = $@"{{
""id"": ""<SUB_ID>"",
""payload"": {{
""data"": ""{subscription}"",
""extensions"": {{
""authorization"": {{
""host"": ""{_gqlHost}"",
""x-api-key"":""{_apiKey}""
}}
}}
}},
""type"": ""start""
}}";
// The output should look like below, note again the "data" field contains a stringified JSON that represents the subscription
/*
{
"id": "<SUB_ID>",
"payload": {
"data": "{\"query\": \"subscription SubscribeToEventComments{ subscribeToEventComments{ content}}\", \"variables\": {} }",
"extensions": {
"authorization": {
"host": "abc.appsync-api.us-west-2.amazonaws.com",
"x-api-key":"<API KEY>"
}
}
},
"type": "start"
}
*/
ArraySegment<byte> outputBuffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes(register));
await _client.SendAsync(outputBuffer, WebSocketMessageType.Text, true, CancellationToken.None);
}
static private async Task Deregister()
{
var deregister = $@"{{
""type"": ""stop"",
""id"": ""<SUB_ID>""
}}"
ArraySegment<byte> outputBuffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes(deregister));
await _client.SendAsync(outputBuffer, WebSocketMessageType.Text, true, CancellationToken.None);
}
static private async Task Receive()
{
while (_socket.State == WebSocketState.Open)
{
ArraySegment<Byte> buffer = new ArraySegment<byte>(new Byte[8192]);
WebSocketReceiveResult result= null;
using (var ms = new MemoryStream())
{
// This loop is needed because the server might send chunks of data that need to be assembled by the client
// see:
do
{
result = await socket.ReceiveAsync(buffer, CancellationToken.None);
ms.Write(buffer.Array, buffer.Offset, result.Count);
}
while (!result.EndOfMessage);
ms.Seek(0, SeekOrigin.Begin);
using (var reader = new StreamReader(ms, Encoding.UTF8))
{
// convert stream to string
var message = reader.ReadToEnd();
Console.WriteLine(message)
// quick and dirty way to check response
if (message.Contains("connection_ack"))
{
// Step 4
await SendSubscription();
} else if (message.Contains("data")) // Step 6
{
// Step 7
await Deregister();
// Step 8
await _client.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
}
}
}
}
}
对于面临同样问题的其他人,我现在创建了一个 nuget 包。
https://www.nuget.org/packages/DotNetCSharp.AWS.AppSync.Client/1.1.1
您可以按如下方式使用它。
//Create Client Specify eithen APIKey or AuthToken
var Client = new AppSyncClient("<Appsync URL>", new AuthOptions()
{
// APIKey = "<API Key>",
AuthToken = "<JWT Token>"
});
//To Subscribe an query
Guid newId = Guid.NewGuid();
await Client.CreateSubscriptionAsync<Message>(new QueryOptions()
{
Query = "subscription <Subscription Query>",
SubscriptionId = newId
},
(data) =>
{
});
//To unsubscribe an subscription
await Client.UnSubscribe(newId);
//To close the websocket
await Client.Close();
我正在尝试实施类似于此 python 示例的应用程序同步订阅,但在 .net https://aws.amazon.com/blogs/mobile/appsync-websockets-python/
中我开始使用 nuget 包 GraphQL.Client https://www.nuget.org/packages/GraphQL.Client Query/Mutation 的执行工作正常,就像 https://github.com/graphql-dotnet/graphql-client 的自述文件中给出的那样 但是订阅不起作用。
我的代码使用GraphQL.Client:
using var graphQLClient = new GraphQLHttpClient("https://<MY-API-PATH>.appsync-realtime-api.<AWS-region>.amazonaws.com/graphql", new NewtonsoftJsonSerializer());
graphQLClient.HttpClient.DefaultRequestHeaders.Add("host", "<API HOST without https or absolute path and 'realtime-' text in the api address>"); //As given in the python example
graphQLClient.HttpClient.DefaultRequestHeaders.Add("x-api-key", "<API KEY>");
var req= new GraphQLRequest
{
Query = @"subscription SubscribeToEventComments{ subscribeToEventComments(eventId: 'test'){ content }}",
Variables = new{}
};
IObservable<GraphQLResponse<Response>> subscriptionStream = graphQLClient.CreateSubscriptionStream<Response>(req, (Exception ex) =>
{
Console.WriteLine("Error: {0}", ex.ToString());
});
var subscription = subscriptionStream.Subscribe(response =>
{
Console.WriteLine($"Response'{Newtonsoft.Json.JsonConvert.SerializeObject(response)}' ");
},
ex =>
{
Console.WriteLine("Error{0}", ex.ToString());
});
它给出了例外 "The remote party closed the WebSocket connection without completing the close handshake."
堆栈跟踪:
在 System.Net.WebSockets.ManagedWebSocket.d__662.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter
1.GetResult()
在 GraphQL.Client.Http.Websocket.GraphQLHttpWebSocket.d__40.MoveNext() 在 C:\Users\UserName\Source\repos\graphql-client\src\GraphQL.Client\Websocket\GraphQLHttpWebSocket.cs:line 546
然后我在没有这个 nuget 的情况下尝试使用标准的 websocket
没有 nuget 的代码:
static public async Task CallWebsocket()
{
try
{
_client = new ClientWebSocket();
_client.Options.AddSubProtocol("graphql-ws");
_client.Options.SetRequestHeader("host", "<HOST URL without wss but now with 'realtime' text in api url because otherwise we are getting SSL error>");
_client.Options.SetRequestHeader("x-api-key", "<API KEY>");
await _client.ConnectAsync(new Uri("https://<MY-APPSYNC_API_PATH>.appsync-realtime-api.<AWS-region>.amazonaws.com/graphql"), CancellationToken.None);
await SendCommand();
var docList = await Receive();
}
catch(Exception ex)
{
}
}
static private async Task SendCommand()
{
ArraySegment<byte> outputBuffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes("'query' : 'subscription SubscribeToEventComments{ subscribeToEventComments(eventId: 'test'){ content }}'"));
await _client.SendAsync(outputBuffer, WebSocketMessageType.Text, true, CancellationToken.None);
}
static private async Task<string> Receive()
{
var receiveBufferSize = 1536;
byte[] buffer = new byte[receiveBufferSize];
var result = await _client.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
var resultJson = (new UTF8Encoding()).GetString(buffer);
return resultJson;
}
我遇到以下异常:
内部异常:"An established connection was aborted by the software in your host machine."
内部异常消息:"Unable to read data from the transport connection: An established connection was aborted by the software in your host machine.."
留言:"The remote party closed the WebSocket connection without completing the close handshake."
任何人都可以帮助正确实施。
Nuget 不能与 AppSync 订阅开箱即用,因此您需要为此编写自己的客户端代码,就像您在第二个(非 nuget)示例中尝试的那样。
现在,对于第二个示例,请再次查看问题中引用的 python example。有几个步骤未包含在您的代码中。我将列举所需的步骤并尝试将它们从 python 代码移植到 C#(请注意,我手头没有 C# 环境,因此可能存在语法错误,但这段代码应该非常接近你需要)
步骤 0 - AppSync 端点
假设为您的 API 调用 aws appsync get-graphql-api --api-id example123456
的结果是:
{
"graphqlApi": {
"name": "myNewRealTimeGraphQL-API",
"authenticationType": "<API_KEY>",
"tags": {},
"apiId": "example123456",
"uris": {
"GRAPHQL": "https://abc.appsync-api.us-west-2.amazonaws.com/graphql",
"REALTIME": "wss://abc.appsync-realtime-api.us-west-2.amazonaws.com/graphql"
},
"arn": "arn:aws:appsync:us-west-2: xxxxxxxxxxxx:apis/xxxxxxxxxxxx"
}
}
第 1 步 - 建立连接 URL
第 2 步 - 连接到 WebSocket 端点
这包括根据 python 文章
中提到的协议发送 connection_init 消息第 3 步 - 根据协议等待 connection_ack
同样,这是根据协议
第 4 步 - 注册订阅
第 5 步 - 发送突变
该步骤不在本回复中,但可以通过 AWS 控制台完成
第 6 步 - 等待“数据”消息
这些是 AppSync 发送的实时事件
第 7 步 - 取消注册订阅
第 8 步 - 断开连接
// These are declared at the same level as your _client
// This comes from the graphqlApi.uris.GRAPHQL in step 0, set as a var here for clarity
_gqlHost = "abc.appsync-api.us-west-2.amazonaws.com";
// This comes from the graphqlApi.uris.REALTIME in step 0, set as a var here for clarity
_realtimeUri = "wss://abc.appsync-realtime-api.us-west-2.amazonaws.com/graphql";
_apiKey = "<API KEY>";
static public async Task CallWebsocket()
{
// Step 1
// This is JSON needed by the server, it will be converted to base64
// (note: might be better to use something like Json.NET for this task)
var header = var test = $@"{{
""host"":""{_gqlHost}"",
""x-api-key"": ""{_apiKey}""
}}";
// Now we need to encode the previous JSON to base64
var headerB64 = System.Convert.ToBase64String(
System.Text.Encoding.UTF8.GetBytes(header));
UriBuilder connectionUriBuilder = new UriBuilder(_realtimeUri);
connectionUriBuilder.Query = $"header={headerB64}&payload=e30=";
try
{
_client = new ClientWebSocket();
_client.Options.AddSubProtocol("graphql-ws");
// Step 2
await _client.ConnectAsync(connectionUriBuilder.Uri), CancellationToken.None);
// Step 3
await SendConnectionInit();
await Receive();
}
catch(Exception ex)
{
}
}
static private async Task SendConnectionInit()
{
ArraySegment<byte> outputBuffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes(@"{""type"": ""connection_init""}"));
await _client.SendAsync(outputBuffer, WebSocketMessageType.Text, true, CancellationToken.None);
}
static private async Task SendSubscription()
{
// This detail is important, note that the subscription is a stringified JSON that will be embeded in the "data" field below
var subscription = $@"{{\""query\"": \""subscription SubscribeToEventComments{{ subscribeToEventComments{{ content }} }}\"", \""variables\"": {{}} }}";
var register = $@"{{
""id"": ""<SUB_ID>"",
""payload"": {{
""data"": ""{subscription}"",
""extensions"": {{
""authorization"": {{
""host"": ""{_gqlHost}"",
""x-api-key"":""{_apiKey}""
}}
}}
}},
""type"": ""start""
}}";
// The output should look like below, note again the "data" field contains a stringified JSON that represents the subscription
/*
{
"id": "<SUB_ID>",
"payload": {
"data": "{\"query\": \"subscription SubscribeToEventComments{ subscribeToEventComments{ content}}\", \"variables\": {} }",
"extensions": {
"authorization": {
"host": "abc.appsync-api.us-west-2.amazonaws.com",
"x-api-key":"<API KEY>"
}
}
},
"type": "start"
}
*/
ArraySegment<byte> outputBuffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes(register));
await _client.SendAsync(outputBuffer, WebSocketMessageType.Text, true, CancellationToken.None);
}
static private async Task Deregister()
{
var deregister = $@"{{
""type"": ""stop"",
""id"": ""<SUB_ID>""
}}"
ArraySegment<byte> outputBuffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes(deregister));
await _client.SendAsync(outputBuffer, WebSocketMessageType.Text, true, CancellationToken.None);
}
static private async Task Receive()
{
while (_socket.State == WebSocketState.Open)
{
ArraySegment<Byte> buffer = new ArraySegment<byte>(new Byte[8192]);
WebSocketReceiveResult result= null;
using (var ms = new MemoryStream())
{
// This loop is needed because the server might send chunks of data that need to be assembled by the client
// see:
do
{
result = await socket.ReceiveAsync(buffer, CancellationToken.None);
ms.Write(buffer.Array, buffer.Offset, result.Count);
}
while (!result.EndOfMessage);
ms.Seek(0, SeekOrigin.Begin);
using (var reader = new StreamReader(ms, Encoding.UTF8))
{
// convert stream to string
var message = reader.ReadToEnd();
Console.WriteLine(message)
// quick and dirty way to check response
if (message.Contains("connection_ack"))
{
// Step 4
await SendSubscription();
} else if (message.Contains("data")) // Step 6
{
// Step 7
await Deregister();
// Step 8
await _client.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
}
}
}
}
}
对于面临同样问题的其他人,我现在创建了一个 nuget 包。 https://www.nuget.org/packages/DotNetCSharp.AWS.AppSync.Client/1.1.1 您可以按如下方式使用它。
//Create Client Specify eithen APIKey or AuthToken
var Client = new AppSyncClient("<Appsync URL>", new AuthOptions()
{
// APIKey = "<API Key>",
AuthToken = "<JWT Token>"
});
//To Subscribe an query
Guid newId = Guid.NewGuid();
await Client.CreateSubscriptionAsync<Message>(new QueryOptions()
{
Query = "subscription <Subscription Query>",
SubscriptionId = newId
},
(data) =>
{
});
//To unsubscribe an subscription
await Client.UnSubscribe(newId);
//To close the websocket
await Client.Close();