如何计算来自 "stream" in Java 的传入消息?

How to count incoming messages from a "stream" in Java?

所以我从一个流中接收到大量消息 - 好吧,它本身并不是一个流,它实际上是一种在收到消息时触发的方法 - 我想计算在10 秒。

这是我目前的情况:

package com.example.demo;

import java.net.URI;

import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;

public class ExampleClient extends WebSocketClient {

    private float messagesRecievedCount;

    public ExampleClient(URI serverUri) {
        super(serverUri);
        System.out.println("Created object");
        setMessagesRecievedCount(0);
    }

    @Override
    public void onOpen(ServerHandshake serverHandshake) {
        System.out.println("Connection established!");
    }

    @Override
    public void onMessage(String s) {
        setMessagesRecievedCount(getMessagesRecievedCount() + 1);
    }

    public void getMessagesPerMinute(){
        float start = getMessagesRecievedCount();
        float end = 0;
        long ten = System.currentTimeMillis() + 1000;
        while(System.currentTimeMillis() < ten) {
            end = getMessagesRecievedCount();
        }
        System.out.println("start: "+start+" end: "+end+
                "Total messages: "+ (end-start)+"\n");
    }

    public float getMessagesRecievedCount() {
        return messagesRecievedCount;
    }

    public void setMessagesRecievedCount(float messagesRecievedCount) {
        this.messagesRecievedCount = messagesRecievedCount;
    }
}

我有一个全局变量 messagesRecievedCount,它保存从 websocket 流接收的消息的 运行 计数。每当收到消息时,都会触发 onMessage() 方法并更新消息计数。我想计算 10 秒内收到的消息数(并将其推断为一分钟)- 我有 getMessagesPerMinute().

显然我这样做的方式并不聪明——它是阻塞的,并且 10 秒后的消息计数是相同的(实际上不是,我实际上收到了 20 条消息)。我觉得我应该做线程,但我不知道该怎么做。你有什么建议?我对此很陌生,只是在修修补补。

这是主要的 class,我从以下位置呼叫 ExampleClient.java

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
import org.springframework.boot.web.servlet.server.ConfigurableServletWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.web.socket.WebSocketSession;

import java.net.URI;
import java.net.URISyntaxException;

@SpringBootApplication
public class DemoApplication  {

    private WebSocketSession clientSession;

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    public DemoApplication () throws URISyntaxException {
        ExampleClient c = new ExampleClient( new URI( "wss://example.com/" ) );
        c.connect();
        c.getMessagesPerMinute();
    }
    
}

c.connect()建立连接,onMessage()很快触发!

您的代码实际上运行了 1 秒(1000 毫秒,我不知道这是错字还是自愿简化)。另一个问题是它在 while 循环内重复调用 end = getMessagesRecievedCount();,而您实际上只需要起始值和最终值。解决这个问题的一种方法是使用 Thread.sleep()(如果你不需要中途取消计数):

    public void getMessagesPerMinute(){
    float start = getMessagesRecievedCount();
    float end = 0;
    try{
            Thread.sleep(10000);
    }
    catch(InterruptedException e){
        System.out.println("do something");
    }
    end = getMessagesRecievedCount();
    System.out.println("start: "+start+" end: "+end+
            "Total messages: "+ (end-start)+"\n");
    }

对于阻塞来说重要的是这段代码运行在一个不同的线程中,那个线程更新 messagesRecievedCount 的值或者同时做你可能想做的其他事情,所以在一个线程中调用它新线程可能是最好的解决方案。我不熟悉您正在使用的框架,因此它可能已经在使用更适合此目的的不同线程。

如果您打算对变量 messagesRecievedCount 做更多的事情,则需要进行一些同步,但对于估计每分钟的消息数量,这应该足够了。

这是我使用的一些测试代码,希望您可以对其进行调整以更好地适应您的情况,并尝试找出问题所在。在这种情况下,差异非常恒定,但值显然已更新。制作 ExampleClient 实例 public 是一个捷径,在实际代码中应该避免。

public class Test{

    public static ExampleClient example=new ExampleClient();
    
    public static void main(String[] args){
        Thread a=new MessagesPerTenSecondFetcher();
        Thread b=new MessagesPerTenSecondFetcher();
        Thread c=new MessagesPerTenSecondFetcher();
        Thread d= new MessageProducer();
        a.start();
        d.start();
        b.start();
        try{
            Thread.sleep(2000);
        }
        catch(InterruptedException e){
            System.out.println("do something");
        }
        c.start();
    }
}

class ExampleClient {
    private float messagesRecievedCount;

    public void onMessage(String s) {
        setMessagesRecievedCount(getMessagesRecievedCount() + 1);
    }

    public void getMessagesPerMinute(){
        float start = getMessagesRecievedCount();
        float end = 0;
        try{
                Thread.sleep(10000);
        }
        catch(InterruptedException e){
            System.out.println("do something");
        }
        end = getMessagesRecievedCount();
        System.out.println("start: "+start+" end: "+end+
                "Total messages: "+ (end-start)+"\n");
    }

    public float getMessagesRecievedCount() {
        return messagesRecievedCount;
    }

    public void setMessagesRecievedCount(float messagesRecievedCount) {
        this.messagesRecievedCount = messagesRecievedCount;
    }
}

class MessagesPerTenSecondFetcher extends Thread{
     @Override
     public void run(){
         Test.example.getMessagesPerMinute();
     }
}

class MessageProducer extends Thread{
     @Override
     public void run(){
        for(int i =0; i<100;i++){
            Test.example.onMessage("a");
            try{
                Thread.sleep(130);
            }
            catch(InterruptedException e){
                System.out.println("do something");
            }
        }
    }
}