如何计算来自 "stream" in Java 的传入消息?
How to count incoming messages from a "stream" in Java?
所以我从一个流中接收到大量消息 - 好吧,它本身并不是一个流,它实际上是一种在收到消息时触发的方法 - 我想计算在10 秒。
这是我目前的情况:
package com.example.demo;
import java.net.URI;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
public class ExampleClient extends WebSocketClient {
private float messagesRecievedCount;
public ExampleClient(URI serverUri) {
super(serverUri);
System.out.println("Created object");
setMessagesRecievedCount(0);
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
System.out.println("Connection established!");
}
@Override
public void onMessage(String s) {
setMessagesRecievedCount(getMessagesRecievedCount() + 1);
}
public void getMessagesPerMinute(){
float start = getMessagesRecievedCount();
float end = 0;
long ten = System.currentTimeMillis() + 1000;
while(System.currentTimeMillis() < ten) {
end = getMessagesRecievedCount();
}
System.out.println("start: "+start+" end: "+end+
"Total messages: "+ (end-start)+"\n");
}
public float getMessagesRecievedCount() {
return messagesRecievedCount;
}
public void setMessagesRecievedCount(float messagesRecievedCount) {
this.messagesRecievedCount = messagesRecievedCount;
}
}
我有一个全局变量 messagesRecievedCount
,它保存从 websocket 流接收的消息的 运行 计数。每当收到消息时,都会触发 onMessage()
方法并更新消息计数。我想计算 10 秒内收到的消息数(并将其推断为一分钟)- 我有 getMessagesPerMinute()
.
显然我这样做的方式并不聪明——它是阻塞的,并且 10 秒后的消息计数是相同的(实际上不是,我实际上收到了 20 条消息)。我觉得我应该做线程,但我不知道该怎么做。你有什么建议?我对此很陌生,只是在修修补补。
这是主要的 class,我从以下位置呼叫 ExampleClient.java
:
package com.example.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
import org.springframework.boot.web.servlet.server.ConfigurableServletWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.web.socket.WebSocketSession;
import java.net.URI;
import java.net.URISyntaxException;
@SpringBootApplication
public class DemoApplication {
private WebSocketSession clientSession;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
public DemoApplication () throws URISyntaxException {
ExampleClient c = new ExampleClient( new URI( "wss://example.com/" ) );
c.connect();
c.getMessagesPerMinute();
}
}
c.connect()
建立连接,onMessage()
很快触发!
您的代码实际上运行了 1 秒(1000 毫秒,我不知道这是错字还是自愿简化)。另一个问题是它在 while 循环内重复调用 end = getMessagesRecievedCount();
,而您实际上只需要起始值和最终值。解决这个问题的一种方法是使用 Thread.sleep()
(如果你不需要中途取消计数):
public void getMessagesPerMinute(){
float start = getMessagesRecievedCount();
float end = 0;
try{
Thread.sleep(10000);
}
catch(InterruptedException e){
System.out.println("do something");
}
end = getMessagesRecievedCount();
System.out.println("start: "+start+" end: "+end+
"Total messages: "+ (end-start)+"\n");
}
对于阻塞来说重要的是这段代码运行在一个不同的线程中,那个线程更新 messagesRecievedCount
的值或者同时做你可能想做的其他事情,所以在一个线程中调用它新线程可能是最好的解决方案。我不熟悉您正在使用的框架,因此它可能已经在使用更适合此目的的不同线程。
如果您打算对变量 messagesRecievedCount
做更多的事情,则需要进行一些同步,但对于估计每分钟的消息数量,这应该足够了。
这是我使用的一些测试代码,希望您可以对其进行调整以更好地适应您的情况,并尝试找出问题所在。在这种情况下,差异非常恒定,但值显然已更新。制作 ExampleClient 实例 public 是一个捷径,在实际代码中应该避免。
public class Test{
public static ExampleClient example=new ExampleClient();
public static void main(String[] args){
Thread a=new MessagesPerTenSecondFetcher();
Thread b=new MessagesPerTenSecondFetcher();
Thread c=new MessagesPerTenSecondFetcher();
Thread d= new MessageProducer();
a.start();
d.start();
b.start();
try{
Thread.sleep(2000);
}
catch(InterruptedException e){
System.out.println("do something");
}
c.start();
}
}
class ExampleClient {
private float messagesRecievedCount;
public void onMessage(String s) {
setMessagesRecievedCount(getMessagesRecievedCount() + 1);
}
public void getMessagesPerMinute(){
float start = getMessagesRecievedCount();
float end = 0;
try{
Thread.sleep(10000);
}
catch(InterruptedException e){
System.out.println("do something");
}
end = getMessagesRecievedCount();
System.out.println("start: "+start+" end: "+end+
"Total messages: "+ (end-start)+"\n");
}
public float getMessagesRecievedCount() {
return messagesRecievedCount;
}
public void setMessagesRecievedCount(float messagesRecievedCount) {
this.messagesRecievedCount = messagesRecievedCount;
}
}
class MessagesPerTenSecondFetcher extends Thread{
@Override
public void run(){
Test.example.getMessagesPerMinute();
}
}
class MessageProducer extends Thread{
@Override
public void run(){
for(int i =0; i<100;i++){
Test.example.onMessage("a");
try{
Thread.sleep(130);
}
catch(InterruptedException e){
System.out.println("do something");
}
}
}
}
所以我从一个流中接收到大量消息 - 好吧,它本身并不是一个流,它实际上是一种在收到消息时触发的方法 - 我想计算在10 秒。
这是我目前的情况:
package com.example.demo;
import java.net.URI;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
public class ExampleClient extends WebSocketClient {
private float messagesRecievedCount;
public ExampleClient(URI serverUri) {
super(serverUri);
System.out.println("Created object");
setMessagesRecievedCount(0);
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
System.out.println("Connection established!");
}
@Override
public void onMessage(String s) {
setMessagesRecievedCount(getMessagesRecievedCount() + 1);
}
public void getMessagesPerMinute(){
float start = getMessagesRecievedCount();
float end = 0;
long ten = System.currentTimeMillis() + 1000;
while(System.currentTimeMillis() < ten) {
end = getMessagesRecievedCount();
}
System.out.println("start: "+start+" end: "+end+
"Total messages: "+ (end-start)+"\n");
}
public float getMessagesRecievedCount() {
return messagesRecievedCount;
}
public void setMessagesRecievedCount(float messagesRecievedCount) {
this.messagesRecievedCount = messagesRecievedCount;
}
}
我有一个全局变量 messagesRecievedCount
,它保存从 websocket 流接收的消息的 运行 计数。每当收到消息时,都会触发 onMessage()
方法并更新消息计数。我想计算 10 秒内收到的消息数(并将其推断为一分钟)- 我有 getMessagesPerMinute()
.
显然我这样做的方式并不聪明——它是阻塞的,并且 10 秒后的消息计数是相同的(实际上不是,我实际上收到了 20 条消息)。我觉得我应该做线程,但我不知道该怎么做。你有什么建议?我对此很陌生,只是在修修补补。
这是主要的 class,我从以下位置呼叫 ExampleClient.java
:
package com.example.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
import org.springframework.boot.web.servlet.server.ConfigurableServletWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.web.socket.WebSocketSession;
import java.net.URI;
import java.net.URISyntaxException;
@SpringBootApplication
public class DemoApplication {
private WebSocketSession clientSession;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
public DemoApplication () throws URISyntaxException {
ExampleClient c = new ExampleClient( new URI( "wss://example.com/" ) );
c.connect();
c.getMessagesPerMinute();
}
}
c.connect()
建立连接,onMessage()
很快触发!
您的代码实际上运行了 1 秒(1000 毫秒,我不知道这是错字还是自愿简化)。另一个问题是它在 while 循环内重复调用 end = getMessagesRecievedCount();
,而您实际上只需要起始值和最终值。解决这个问题的一种方法是使用 Thread.sleep()
(如果你不需要中途取消计数):
public void getMessagesPerMinute(){
float start = getMessagesRecievedCount();
float end = 0;
try{
Thread.sleep(10000);
}
catch(InterruptedException e){
System.out.println("do something");
}
end = getMessagesRecievedCount();
System.out.println("start: "+start+" end: "+end+
"Total messages: "+ (end-start)+"\n");
}
对于阻塞来说重要的是这段代码运行在一个不同的线程中,那个线程更新 messagesRecievedCount
的值或者同时做你可能想做的其他事情,所以在一个线程中调用它新线程可能是最好的解决方案。我不熟悉您正在使用的框架,因此它可能已经在使用更适合此目的的不同线程。
如果您打算对变量 messagesRecievedCount
做更多的事情,则需要进行一些同步,但对于估计每分钟的消息数量,这应该足够了。
这是我使用的一些测试代码,希望您可以对其进行调整以更好地适应您的情况,并尝试找出问题所在。在这种情况下,差异非常恒定,但值显然已更新。制作 ExampleClient 实例 public 是一个捷径,在实际代码中应该避免。
public class Test{
public static ExampleClient example=new ExampleClient();
public static void main(String[] args){
Thread a=new MessagesPerTenSecondFetcher();
Thread b=new MessagesPerTenSecondFetcher();
Thread c=new MessagesPerTenSecondFetcher();
Thread d= new MessageProducer();
a.start();
d.start();
b.start();
try{
Thread.sleep(2000);
}
catch(InterruptedException e){
System.out.println("do something");
}
c.start();
}
}
class ExampleClient {
private float messagesRecievedCount;
public void onMessage(String s) {
setMessagesRecievedCount(getMessagesRecievedCount() + 1);
}
public void getMessagesPerMinute(){
float start = getMessagesRecievedCount();
float end = 0;
try{
Thread.sleep(10000);
}
catch(InterruptedException e){
System.out.println("do something");
}
end = getMessagesRecievedCount();
System.out.println("start: "+start+" end: "+end+
"Total messages: "+ (end-start)+"\n");
}
public float getMessagesRecievedCount() {
return messagesRecievedCount;
}
public void setMessagesRecievedCount(float messagesRecievedCount) {
this.messagesRecievedCount = messagesRecievedCount;
}
}
class MessagesPerTenSecondFetcher extends Thread{
@Override
public void run(){
Test.example.getMessagesPerMinute();
}
}
class MessageProducer extends Thread{
@Override
public void run(){
for(int i =0; i<100;i++){
Test.example.onMessage("a");
try{
Thread.sleep(130);
}
catch(InterruptedException e){
System.out.println("do something");
}
}
}
}