我如何观察 C# 和 CouchDB 中的新数据?
How can i observe for new data in C# and CouchDB?
我做了一个函数,从数据库 (CouchDB) 获取新到达的数据,然后使用这些数据进行分析。一切正常,但现在我正在努力如何在没有更多数据时保持功能活跃。
我现在得到了什么:
public async Task<FsResponseModel.Root> AnalyzeImage()
{
HttpClient client = new HttpClient();
client.DefaultRequestHeaders.Add("Accept", "application/json");
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(System.Text.ASCIIEncoding.ASCII.GetBytes($"user:pw" + $"")));
HttpResponseMessage httpResponse = await client.GetAsync("CouchDBUrl/_changes?feed=continuous&filter=_view&view=list/images&include_docs=true&attachments=true");
httpResponse.EnsureSuccessStatusCode();
string json = await httpResponse.Content.ReadAsStringAsync();
//Somewhere to keep the parsed results
var results = new List<ViewResponseModel>();
Root root = null;
var serializer = new JsonSerializer();
using (var stringReader = new StringReader(json))
using (var jsonReader = new JsonTextReader(stringReader))
{
//Make sure the JsonReader knows we have multiple documents
jsonReader.SupportMultipleContent = true;
while (jsonReader.Read())
{
//Read in the next document
var nextObject = JObject.ReadFrom(jsonReader);
//Determine if we are on the last item or not
if (nextObject["last_seq"] != null)
{
root = nextObject.ToObject<Root>();
}
else
{
results.Add(nextObject.ToObject<ViewResponseModel>());
}
}
}
if (results.Count >= 0)
{
while (results.Count >= 0)
{
string base64Image = results[0].doc._attachments.image.data;
string refNr = results[0].doc.creator;
string token = "myToken";
var httpWebRequest = (HttpWebRequest)WebRequest.Create("MyApi");
httpWebRequest.PreAuthenticate = true;
httpWebRequest.Headers.Add("Authorization", "Bearer " + token);
httpWebRequest.ContentType = "application/json";
httpWebRequest.Accept = "application/json";
httpWebRequest.Method = "POST";
var obj = new FsRequestModel() { };
obj.Image = base64Image;
obj.Application = "MyApp";
obj.RefNr = "hzd." + refNr + results[0].doc._id;
string imageJson = JsonConvert.SerializeObject(obj);
using (var streamWriter = new StreamWriter(httpWebRequest.GetRequestStream()))
{
streamWriter.Write(imageJson);
}
var httpResponseApi = (HttpWebResponse)httpWebRequest.GetResponse();
var result = "";
using (var streamReader = new StreamReader(httpResponseApi.GetResponseStream()))
{
result = streamReader.ReadToEnd();
}
FsResponseModel.Root response = JsonConvert.DeserializeObject<FsResponseModel.Root>(result);
return response;
}
}
else
{
//w8 for new data
return null;
}
return null;
}
我需要一个解决方案来告诉代码它必须等待新数据,如果有新数据可用,请重新开始该过程。
编辑:
我现在正在尝试处理传入的提要。任何想法如何消费这个?
public async Task ObserveDbAndTrigger()
{
var url = "http://localhost:5984/mydb/_changes?feed=continuous&filter=_view&view=MyView&include_docs=true&attachments=true&heartbeat=1000";
using (var client = new HttpClient())
{
client.Timeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
client.DefaultRequestHeaders.Add("Accept", "application/json");
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(System.Text.ASCIIEncoding.ASCII.GetBytes($"user:pw" + $"")));
string line = "";
var request = new HttpRequestMessage(HttpMethod.Get, url);
using (var response = await client.SendAsync(
request,
HttpCompletionOption.ResponseHeadersRead))
{
using (var body = await response.Content.ReadAsStreamAsync())
using (var reader = new StreamReader(body))
while (!reader.EndOfStream)
line = reader.ReadLine();
if (line != "")
{
//get data and work with it
}
}
}
}
我有流,但我不知道如何处理来电。
任何帮助都会很棒。
编辑2:
Stream 不工作,我已经测试过了,它无法识别新行...
我的下一个想法是尝试使用事件源而不是连续馈送。
对这个想法有什么建议吗?
我改成这样,现在可以处理数据了。
static void Main()
{
string db = "myDb";
string auth = "Basic " + Convert.ToBase64String(ASCIIEncoding.ASCII.GetBytes(String.Join(":", "user", "password")));
string query = db + "/_changes?feed=eventsource&include_docs=true&attachments=true&since=0&heartbeat=5000";
ChangesPublisher pub = new ChangesPublisher();
pub.OnChange += (sender, e) => Console.WriteLine(e.Value);
pub.OnException += (sender, e) => Console.WriteLine(e.Value.ToString() + "\r\n\r\nPress a key to exit.");
// start publishing.
Task.Run(async () =>
{
await pub.Begin("localhost", 5984, query, auth);
});
//Stop running
Console.ReadKey();
pub.Stop = true;
}
public class ChangesPublisher
{
public bool Stop { get; set; }
public class ChangeEvent : EventArgs
{
public string Value { get; set; }
public ChangeEvent(string value)
{
Value = value;
}
}
public class ExceptionEvent : EventArgs
{
public Exception Value { get; set; }
public ExceptionEvent(Exception value)
{
Value = value;
}
}
public event EventHandler<ChangeEvent> OnChange = delegate { };
public event EventHandler<ExceptionEvent> OnException = delegate { };
public async Task Begin(string serverAddr, int port, string query, string auth)
{
using (var client = new TcpClient())
{
string request = String.Join("\r\n", new List<string> {
String.Format("GET /{0} HTTP/1.1",query),
"Authorization: " + auth,
"Accept: application/json",
"Host: " + serverAddr,
"Connection: keep-alive",
"\r\n"
});
try
{
await client.ConnectAsync(serverAddr, port);
using (NetworkStream stream = client.GetStream())
{
StreamWriter writer = new StreamWriter(stream);
await writer.WriteAsync(request);
await writer.FlushAsync();
StreamReader reader = new StreamReader(stream);
while (!Stop)
{
string data = await reader.ReadLineAsync();
OnChange(this, new ChangeEvent(data));
//work with data
}
}
}
catch (Exception e)
{
OnException(this, new ExceptionEvent(e));
}
}
}
}
我做了一个函数,从数据库 (CouchDB) 获取新到达的数据,然后使用这些数据进行分析。一切正常,但现在我正在努力如何在没有更多数据时保持功能活跃。
我现在得到了什么:
public async Task<FsResponseModel.Root> AnalyzeImage()
{
HttpClient client = new HttpClient();
client.DefaultRequestHeaders.Add("Accept", "application/json");
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(System.Text.ASCIIEncoding.ASCII.GetBytes($"user:pw" + $"")));
HttpResponseMessage httpResponse = await client.GetAsync("CouchDBUrl/_changes?feed=continuous&filter=_view&view=list/images&include_docs=true&attachments=true");
httpResponse.EnsureSuccessStatusCode();
string json = await httpResponse.Content.ReadAsStringAsync();
//Somewhere to keep the parsed results
var results = new List<ViewResponseModel>();
Root root = null;
var serializer = new JsonSerializer();
using (var stringReader = new StringReader(json))
using (var jsonReader = new JsonTextReader(stringReader))
{
//Make sure the JsonReader knows we have multiple documents
jsonReader.SupportMultipleContent = true;
while (jsonReader.Read())
{
//Read in the next document
var nextObject = JObject.ReadFrom(jsonReader);
//Determine if we are on the last item or not
if (nextObject["last_seq"] != null)
{
root = nextObject.ToObject<Root>();
}
else
{
results.Add(nextObject.ToObject<ViewResponseModel>());
}
}
}
if (results.Count >= 0)
{
while (results.Count >= 0)
{
string base64Image = results[0].doc._attachments.image.data;
string refNr = results[0].doc.creator;
string token = "myToken";
var httpWebRequest = (HttpWebRequest)WebRequest.Create("MyApi");
httpWebRequest.PreAuthenticate = true;
httpWebRequest.Headers.Add("Authorization", "Bearer " + token);
httpWebRequest.ContentType = "application/json";
httpWebRequest.Accept = "application/json";
httpWebRequest.Method = "POST";
var obj = new FsRequestModel() { };
obj.Image = base64Image;
obj.Application = "MyApp";
obj.RefNr = "hzd." + refNr + results[0].doc._id;
string imageJson = JsonConvert.SerializeObject(obj);
using (var streamWriter = new StreamWriter(httpWebRequest.GetRequestStream()))
{
streamWriter.Write(imageJson);
}
var httpResponseApi = (HttpWebResponse)httpWebRequest.GetResponse();
var result = "";
using (var streamReader = new StreamReader(httpResponseApi.GetResponseStream()))
{
result = streamReader.ReadToEnd();
}
FsResponseModel.Root response = JsonConvert.DeserializeObject<FsResponseModel.Root>(result);
return response;
}
}
else
{
//w8 for new data
return null;
}
return null;
}
我需要一个解决方案来告诉代码它必须等待新数据,如果有新数据可用,请重新开始该过程。
编辑:
我现在正在尝试处理传入的提要。任何想法如何消费这个?
public async Task ObserveDbAndTrigger()
{
var url = "http://localhost:5984/mydb/_changes?feed=continuous&filter=_view&view=MyView&include_docs=true&attachments=true&heartbeat=1000";
using (var client = new HttpClient())
{
client.Timeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
client.DefaultRequestHeaders.Add("Accept", "application/json");
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(System.Text.ASCIIEncoding.ASCII.GetBytes($"user:pw" + $"")));
string line = "";
var request = new HttpRequestMessage(HttpMethod.Get, url);
using (var response = await client.SendAsync(
request,
HttpCompletionOption.ResponseHeadersRead))
{
using (var body = await response.Content.ReadAsStreamAsync())
using (var reader = new StreamReader(body))
while (!reader.EndOfStream)
line = reader.ReadLine();
if (line != "")
{
//get data and work with it
}
}
}
}
我有流,但我不知道如何处理来电。 任何帮助都会很棒。
编辑2:
Stream 不工作,我已经测试过了,它无法识别新行...
我的下一个想法是尝试使用事件源而不是连续馈送。
对这个想法有什么建议吗?
我改成这样,现在可以处理数据了。
static void Main()
{
string db = "myDb";
string auth = "Basic " + Convert.ToBase64String(ASCIIEncoding.ASCII.GetBytes(String.Join(":", "user", "password")));
string query = db + "/_changes?feed=eventsource&include_docs=true&attachments=true&since=0&heartbeat=5000";
ChangesPublisher pub = new ChangesPublisher();
pub.OnChange += (sender, e) => Console.WriteLine(e.Value);
pub.OnException += (sender, e) => Console.WriteLine(e.Value.ToString() + "\r\n\r\nPress a key to exit.");
// start publishing.
Task.Run(async () =>
{
await pub.Begin("localhost", 5984, query, auth);
});
//Stop running
Console.ReadKey();
pub.Stop = true;
}
public class ChangesPublisher
{
public bool Stop { get; set; }
public class ChangeEvent : EventArgs
{
public string Value { get; set; }
public ChangeEvent(string value)
{
Value = value;
}
}
public class ExceptionEvent : EventArgs
{
public Exception Value { get; set; }
public ExceptionEvent(Exception value)
{
Value = value;
}
}
public event EventHandler<ChangeEvent> OnChange = delegate { };
public event EventHandler<ExceptionEvent> OnException = delegate { };
public async Task Begin(string serverAddr, int port, string query, string auth)
{
using (var client = new TcpClient())
{
string request = String.Join("\r\n", new List<string> {
String.Format("GET /{0} HTTP/1.1",query),
"Authorization: " + auth,
"Accept: application/json",
"Host: " + serverAddr,
"Connection: keep-alive",
"\r\n"
});
try
{
await client.ConnectAsync(serverAddr, port);
using (NetworkStream stream = client.GetStream())
{
StreamWriter writer = new StreamWriter(stream);
await writer.WriteAsync(request);
await writer.FlushAsync();
StreamReader reader = new StreamReader(stream);
while (!Stop)
{
string data = await reader.ReadLineAsync();
OnChange(this, new ChangeEvent(data));
//work with data
}
}
}
catch (Exception e)
{
OnException(this, new ExceptionEvent(e));
}
}
}
}