阅读 java 前 10 分钟的消息
Read messages which are 10 minutes old in java
我正在尝试实现一个生产者和消费者。
生产者将继续将消息推入队列的位置。
但是,消费者必须在到达队列后 30 分钟后才能阅读这些消息。
假设
m1 reaches at 10am
m2 reaches at 10.10am
m3 reaches at 10.20am
消费者必须在
取货
m1 at 10.30am
m2 at 10.40am
m3 at 10.50am
在java.
有什么办法吗
我尝试了 BlockingQueue,但我不认为我们可以使用 BlockingQueue 将其存档。
在 BlockingQueue 中,消费者将在队列中有可用消息时立即读取。
任何帮助将不胜感激。
DelayQueue
Java 随 DelayQueue
, an implementation of Queue
and BlockingQueue
一起构建,仅在指定的延迟期到期后才提供元素。
您提供给队列的对象必须有一定的等待时间,通过实现 Delayed interface. This interface requires one method: getDelay
来定义。此接口还有一项要求:
An implementation of this interface must define a compareTo
method that provides an ordering consistent with its getDelay
method.
定义您的消息 class 以满足这两个要求。
public class DelayedMessage implements Delayed , Comparable {…}
定义您的队列以携带该类型的对象。
BlockingQueue< DelayedMessage > queue = new DelayQueue<>() ;
参见 this tutorial, and the tutorial by Oracle。
例子
这是一个示例应用程序。
我们定义了一个 Message
class 来实现 Delayed
,从而也实现了 Comparable<Delayed>
。
请注意我们如何在 Message
对象上记录开始时刻,捕获当前时刻,然后添加指定为 Duration
对象的时间量。特别注意 required getDelay
方法如何动态地重新计算其执行时刻和延迟到期时刻之间的剩余时间。当 getDelay
结果为正时,队列中的 Delayed
元素将不可用;只有当 getDelay
returns 为零或负数时,该元素才可用。
package work.basil.example;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class Message implements Delayed
{
// Member fields
CharSequence message;
Instant momentToRun;
// Constructor
public Message ( CharSequence message , Duration delay )
{
this.message = Objects.requireNonNull( message );
this.momentToRun = Instant.now().plus( Objects.requireNonNull( delay ) );
}
// Getter
public CharSequence getMessage ( ) { return this.message; }
;
// Implements Delayed
@Override
public long getDelay ( TimeUnit timeUnit )
{
Duration timeRemaining = Duration.between( Instant.now() , this.momentToRun );
long result = timeUnit.convert( timeRemaining );
return result;
}
// Implements Comparable
@Override
public int compareTo ( Delayed o )
{
TimeUnit tu = TimeUnit.NANOSECONDS;
return Long.compare( this.getDelay( tu ) , o.getDelay( tu ) );
}
// Object
@Override
public String toString ( )
{
return "Message{ " +
"message=" + message +
" | momentToRun=" + momentToRun +
" | getDelay (nanos) = " + this.getDelay( TimeUnit.NANOSECONDS ) +
" }";
}
}
接下来我们使用 Message
class。我们将 Message
个对象存储在 DelayQueue
中。我们转储该队列以验证其内容。
然后我们创建一个后台线程来每秒检查一次队列。如果我们从队列中得到 null
,我们将忽略它。如果我们从队列中得到一个 Message
对象,我们将以 UTC 格式写入当前时刻,然后写入该消息对象的 toString
输出。我们将这项工作安排在 ScheduledExecutorService
(see tutorial by Oracle)。
运行安装此应用程序时请等待整整一分钟。一分钟后,执行程序服务关闭,并向控制台写入一条包含当前时刻的通知,让您知道该应用程序已成功结束其 运行。
package work.basil.example;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class DemoDelayQueue
{
public static void main ( String[] args )
{
DemoDelayQueue app = new DemoDelayQueue();
app.demo();
}
private void demo ( )
{
System.out.println( "App beginning now: " + Instant.now() );
BlockingQueue < Message > queue = new DelayQueue <>();
queue.add( new Message( "Message delayed 5 seconds." , Duration.ofSeconds( 5 ) ) );
queue.add( new Message( "Message delayed 47 seconds." , Duration.ofSeconds( 47 ) ) );
queue.add( new Message( "Message delayed 3 seconds." , Duration.ofSeconds( 3 ) ) );
queue.add( new Message( "Message delayed 12 seconds." , Duration.ofSeconds( 12 ) ) );
System.out.println( "queue = " + queue );
ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor();
ses.scheduleAtFixedRate(
( ) -> {
Message m = queue.poll();
if ( Objects.nonNull( m ) )
{
System.out.println( "--------" );
System.out.println( "Retrieved `Message` object from queue at: " + Instant.now() );
System.out.println( m.toString() );
}
else // Else `m` is null.
{
// System.out.println( "No message available. Null returned by queue." ) ;
}
}
,
0 ,
1 ,
TimeUnit.SECONDS
);
try
{
Thread.sleep( TimeUnit.MINUTES.toMillis( 1 ) );
}
catch ( InterruptedException e )
{
e.printStackTrace();
}
finally
{
ses.shutdown();
System.out.println( "App ending now: " + Instant.now() );
}
}
}
当 运行 时,输出如下所示:
App beginning now: 2020-06-12T21:26:23.256755Z
queue = [Message{ message=Message delayed 3 seconds. | momentToRun=2020-06-12T21:26:26.271260Z | getDelay (nanos) = 2999677000 }, Message{ message=Message delayed 12 seconds. | momentToRun=2020-06-12T21:26:35.271324Z | getDelay (nanos) = 11976986000 }, Message{ message=Message delayed 5 seconds. | momentToRun=2020-06-12T21:26:28.271128Z | getDelay (nanos) = 4976537000 }, Message{ message=Message delayed 47 seconds. | momentToRun=2020-06-12T21:27:10.271205Z | getDelay (nanos) = 46976417000 }]
--------
Retrieved `Message` object from queue at: 2020-06-12T21:26:26.302270Z
Message{ message=Message delayed 3 seconds. | momentToRun=2020-06-12T21:26:26.271260Z | getDelay (nanos) = -31295000 }
--------
Retrieved `Message` object from queue at: 2020-06-12T21:26:28.300048Z
Message{ message=Message delayed 5 seconds. | momentToRun=2020-06-12T21:26:28.271128Z | getDelay (nanos) = -29093000 }
--------
Retrieved `Message` object from queue at: 2020-06-12T21:26:35.303619Z
Message{ message=Message delayed 12 seconds. | momentToRun=2020-06-12T21:26:35.271324Z | getDelay (nanos) = -32412000 }
--------
Retrieved `Message` object from queue at: 2020-06-12T21:27:10.300950Z
Message{ message=Message delayed 47 seconds. | momentToRun=2020-06-12T21:27:10.271205Z | getDelay (nanos) = -29863000 }
App ending now: 2020-06-12T21:27:23.302958Z
我正在尝试实现一个生产者和消费者。 生产者将继续将消息推入队列的位置。 但是,消费者必须在到达队列后 30 分钟后才能阅读这些消息。
假设
m1 reaches at 10am
m2 reaches at 10.10am
m3 reaches at 10.20am
消费者必须在
取货m1 at 10.30am
m2 at 10.40am
m3 at 10.50am
在java.
有什么办法吗我尝试了 BlockingQueue,但我不认为我们可以使用 BlockingQueue 将其存档。 在 BlockingQueue 中,消费者将在队列中有可用消息时立即读取。
任何帮助将不胜感激。
DelayQueue
Java 随 DelayQueue
, an implementation of Queue
and BlockingQueue
一起构建,仅在指定的延迟期到期后才提供元素。
您提供给队列的对象必须有一定的等待时间,通过实现 Delayed interface. This interface requires one method: getDelay
来定义。此接口还有一项要求:
An implementation of this interface must define a
compareTo
method that provides an ordering consistent with itsgetDelay
method.
定义您的消息 class 以满足这两个要求。
public class DelayedMessage implements Delayed , Comparable {…}
定义您的队列以携带该类型的对象。
BlockingQueue< DelayedMessage > queue = new DelayQueue<>() ;
参见 this tutorial, and the tutorial by Oracle。
例子
这是一个示例应用程序。
我们定义了一个 Message
class 来实现 Delayed
,从而也实现了 Comparable<Delayed>
。
请注意我们如何在 Message
对象上记录开始时刻,捕获当前时刻,然后添加指定为 Duration
对象的时间量。特别注意 required getDelay
方法如何动态地重新计算其执行时刻和延迟到期时刻之间的剩余时间。当 getDelay
结果为正时,队列中的 Delayed
元素将不可用;只有当 getDelay
returns 为零或负数时,该元素才可用。
package work.basil.example;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class Message implements Delayed
{
// Member fields
CharSequence message;
Instant momentToRun;
// Constructor
public Message ( CharSequence message , Duration delay )
{
this.message = Objects.requireNonNull( message );
this.momentToRun = Instant.now().plus( Objects.requireNonNull( delay ) );
}
// Getter
public CharSequence getMessage ( ) { return this.message; }
;
// Implements Delayed
@Override
public long getDelay ( TimeUnit timeUnit )
{
Duration timeRemaining = Duration.between( Instant.now() , this.momentToRun );
long result = timeUnit.convert( timeRemaining );
return result;
}
// Implements Comparable
@Override
public int compareTo ( Delayed o )
{
TimeUnit tu = TimeUnit.NANOSECONDS;
return Long.compare( this.getDelay( tu ) , o.getDelay( tu ) );
}
// Object
@Override
public String toString ( )
{
return "Message{ " +
"message=" + message +
" | momentToRun=" + momentToRun +
" | getDelay (nanos) = " + this.getDelay( TimeUnit.NANOSECONDS ) +
" }";
}
}
接下来我们使用 Message
class。我们将 Message
个对象存储在 DelayQueue
中。我们转储该队列以验证其内容。
然后我们创建一个后台线程来每秒检查一次队列。如果我们从队列中得到 null
,我们将忽略它。如果我们从队列中得到一个 Message
对象,我们将以 UTC 格式写入当前时刻,然后写入该消息对象的 toString
输出。我们将这项工作安排在 ScheduledExecutorService
(see tutorial by Oracle)。
运行安装此应用程序时请等待整整一分钟。一分钟后,执行程序服务关闭,并向控制台写入一条包含当前时刻的通知,让您知道该应用程序已成功结束其 运行。
package work.basil.example;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class DemoDelayQueue
{
public static void main ( String[] args )
{
DemoDelayQueue app = new DemoDelayQueue();
app.demo();
}
private void demo ( )
{
System.out.println( "App beginning now: " + Instant.now() );
BlockingQueue < Message > queue = new DelayQueue <>();
queue.add( new Message( "Message delayed 5 seconds." , Duration.ofSeconds( 5 ) ) );
queue.add( new Message( "Message delayed 47 seconds." , Duration.ofSeconds( 47 ) ) );
queue.add( new Message( "Message delayed 3 seconds." , Duration.ofSeconds( 3 ) ) );
queue.add( new Message( "Message delayed 12 seconds." , Duration.ofSeconds( 12 ) ) );
System.out.println( "queue = " + queue );
ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor();
ses.scheduleAtFixedRate(
( ) -> {
Message m = queue.poll();
if ( Objects.nonNull( m ) )
{
System.out.println( "--------" );
System.out.println( "Retrieved `Message` object from queue at: " + Instant.now() );
System.out.println( m.toString() );
}
else // Else `m` is null.
{
// System.out.println( "No message available. Null returned by queue." ) ;
}
}
,
0 ,
1 ,
TimeUnit.SECONDS
);
try
{
Thread.sleep( TimeUnit.MINUTES.toMillis( 1 ) );
}
catch ( InterruptedException e )
{
e.printStackTrace();
}
finally
{
ses.shutdown();
System.out.println( "App ending now: " + Instant.now() );
}
}
}
当 运行 时,输出如下所示:
App beginning now: 2020-06-12T21:26:23.256755Z
queue = [Message{ message=Message delayed 3 seconds. | momentToRun=2020-06-12T21:26:26.271260Z | getDelay (nanos) = 2999677000 }, Message{ message=Message delayed 12 seconds. | momentToRun=2020-06-12T21:26:35.271324Z | getDelay (nanos) = 11976986000 }, Message{ message=Message delayed 5 seconds. | momentToRun=2020-06-12T21:26:28.271128Z | getDelay (nanos) = 4976537000 }, Message{ message=Message delayed 47 seconds. | momentToRun=2020-06-12T21:27:10.271205Z | getDelay (nanos) = 46976417000 }]
--------
Retrieved `Message` object from queue at: 2020-06-12T21:26:26.302270Z
Message{ message=Message delayed 3 seconds. | momentToRun=2020-06-12T21:26:26.271260Z | getDelay (nanos) = -31295000 }
--------
Retrieved `Message` object from queue at: 2020-06-12T21:26:28.300048Z
Message{ message=Message delayed 5 seconds. | momentToRun=2020-06-12T21:26:28.271128Z | getDelay (nanos) = -29093000 }
--------
Retrieved `Message` object from queue at: 2020-06-12T21:26:35.303619Z
Message{ message=Message delayed 12 seconds. | momentToRun=2020-06-12T21:26:35.271324Z | getDelay (nanos) = -32412000 }
--------
Retrieved `Message` object from queue at: 2020-06-12T21:27:10.300950Z
Message{ message=Message delayed 47 seconds. | momentToRun=2020-06-12T21:27:10.271205Z | getDelay (nanos) = -29863000 }
App ending now: 2020-06-12T21:27:23.302958Z