Spring 状态机 timerOnce() 转换中断退出操作

Spring state machine timerOnce() transition interrupting exit actions

我有以下状态机;

@Configuration
@EnableStateMachineFactory
public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<GameStep, GameEventType> {

    @Autowired
    private GameStateService gameService;

    @Autowired
    private WordsGameService wordsGameService;

    @Override
    public void configure(StateMachineStateConfigurer<GameStep, GameEventType> states) throws Exception {
        states.withStates().initial(GAME_STARTED, initGame())
                .state(WAITING_FOR_PLAYERS, Arrays.asList(initPlayers(), setPlayersConfirmedFalse()), Collections.singletonList(informPlayers()))
                .state(WORDS_STARTED, setPlayersConfirmedFalse(), informPlayers())
                .state(WORDS_ENDED, null, informPlayers())
                .state(EQUATIONS_STARTED, setPlayersConfirmedFalse(), informPlayers())
                .state(EQUATIONS_ENDED, null, informPlayers())
        ;
    }

    @Bean
    public Action<GameStep, GameEventType> initGame() {
        return context -> {
            GameState game = gameService.createNewGameState(null, null);
            context.getExtendedState().getVariables().put("game", game);
        };
    }

    @Bean
    public Action<GameStep, GameEventType> setPlayersConfirmedFalse() {
        return context -> {
            GameState game = (GameState) context.getExtendedState().getVariables().get("game");
            game.getFirstPlayer().setConfirmedToPlay(false);
            game.getSecondPlayer().setConfirmedToPlay(false);
        };
    }

    @Bean
    public Action<GameStep, GameEventType> initPlayers() {
        return context -> {
            PlayersJoinedMessage message = context.getMessageHeaders().get("message", PlayersJoinedMessage.class);
            GameState game = getGameFromHeader(context);
            game.setFirstPlayer(new Player(message.getFirstPlayer(), PlayerPosition.FIRST));
            game.setSecondPlayer(new Player(message.getSecondPlayer(), PlayerPosition.SECOND));
        };
    }

    private GameState getGameFromHeader(StateContext<GameStep, GameEventType> context) {
        return (GameState) context.getExtendedState().getVariables().get("game");
    }

    @Override
    public void configure(StateMachineTransitionConfigurer<GameStep, GameEventType> transitions)
            throws Exception {
        transitions
                .withExternal().source(GAME_STARTED).target(WAITING_FOR_PLAYERS).event(PLAYERS_JOINED)
                .and()
                .withExternal().source(WAITING_FOR_PLAYERS).target(WORDS_STARTED).event(START_CONFIRMATION).guard(bothPlayersConfirmed())
                .and()
                .withExternal().source(WORDS_STARTED).target(WORDS_ENDED).event(SEND_WORD).guard(sendWord())
                .and()
                .withExternal().source(WORDS_STARTED).target(WORDS_ENDED).event(TIME_EXPIRED)
                .and()
                .withInternal().source(WORDS_STARTED).action(timerAction()).timerOnce(5000)
                .and()
                .withExternal().source(WORDS_ENDED).target(EQUATIONS_STARTED).event(TIME_EXPIRED)
                .and()
                .withInternal().source(WORDS_ENDED).action(timerAction()).timerOnce(5000)
                .and()
                .withExternal().source(EQUATIONS_STARTED).target(EQUATIONS_ENDED).event(SEND_EQUATION).guard(bothPlayersConfirmed())
                .and()
                .withExternal().source(EQUATIONS_STARTED).target(EQUATIONS_ENDED).event(TIME_EXPIRED)
                .and()
                .withInternal().source(EQUATIONS_STARTED).action(timerAction()).timerOnce(155000)
        //more games

        ;
    }

    @Bean
    public Guard<GameStep, GameEventType> sendWord() {
        return context -> {
            ChangeGameMessage message = context.getMessage().getHeaders().get("message", ChangeGameMessage.class);
            GameState gameState = getGameFromHeader(context);
            int points = wordsGameService.checkWord(gameState.getGame().getWordsGame(), message.getGameEvent().getMessage().split(","));
            Player player = gameState.getPlayerByName(message.getPlayer());
            player.setWordsPoints(points);

            return gameState.getFirstPlayer().getWordsPoints() != null && gameState.getSecondPlayer().getWordsPoints() != null;
        };
    }

    @Override
    public void configure(StateMachineConfigurationConfigurer<GameStep, GameEventType> config)
            throws Exception {
        config
                .withConfiguration()
                .autoStartup(true)
//                .taskExecutor(new SimpleAsyncTaskExecutor())
//                .taskScheduler(new ConcurrentTaskScheduler())
                .listener(listener());
    }

    @Bean
    public StateMachineListener<GameStep, GameEventType> listener() {
        return new StateMachineListenerAdapter<GameStep, GameEventType>() {
            @Override
            public void stateChanged(State<GameStep, GameEventType> from, State<GameStep, GameEventType> to) {
                System.out.println("State changed to " + to.getId());
            }
        };
    }

    @Bean
    public Guard<GameStep, GameEventType> bothPlayersConfirmed() {
        return context -> {
            ChangeGameMessage message = context.getMessage().getHeaders().get("message", ChangeGameMessage.class);
            GameState gameState = getGameFromHeader(context);
            Player player = gameState.getPlayerByName(message.getPlayer());
            player.setConfirmedToPlay(true);
            return gameState.getFirstPlayer().getConfirmedToPlay() && gameState.getSecondPlayer().getConfirmedToPlay();
        };
    }

    @Bean
    public Action<GameStep, GameEventType> timerAction() {
        return context -> {
            System.out.println("timer event");
//            informPlayers().execute(context);
            context.getStateMachine().sendEvent(TIME_EXPIRED);
        };
    }

    @Bean
    public Action<GameStep, GameEventType> informPlayers() {
        return new Action<GameStep, GameEventType>() {

            @Autowired
            private SimpMessagingTemplate messagingTemplate;

            @Override
            public void execute(StateContext<GameStep, GameEventType> context) {
                GameState game = context.getExtendedState().get("game", GameState.class);
                messagingTemplate.convertAndSendToUser(game.getFirstPlayer().getName(), "/queue/eureka", game);
                messagingTemplate.convertAndSendToUser(game.getSecondPlayer().getName(), "/queue/eureka", game);
            }
        };
    }
//
//    @Bean(name = StateMachineSystemConstants.TASK_EXECUTOR_BEAN_NAME)
//    public TaskExecutor taskExecutor() {
//        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//        taskExecutor.setCorePoolSize(5);
//        return taskExecutor;
//    }
}

问题是如果 timerAction() 被触发,动作 informPlayers() 就会被中断。如果在守卫 bothPlayersConfirmed() returns true 时状态转换,则 informPlayers() 被正确调用,所以问题出在 timerOnce() 转换方法上。堆栈跟踪:

State changed to EQUATIONS_ENDED
timer event
2016-06-24 16:43:39.670 ERROR 83865 --- [pool-2-thread-1] o.s.statemachine.state.ObjectState       : Action execution resulted error

org.springframework.messaging.MessageDeliveryException: nested exception is java.lang.InterruptedException
    at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$SystemStompConnectionHandler.forward(StompBrokerRelayMessageHandler.java:951)
    at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.handleMessageInternal(StompBrokerRelayMessageHandler.java:521)
    at org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler.handleMessage(AbstractBrokerMessageHandler.java:238)
    at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:135)
    at org.springframework.messaging.support.ExecutorSubscribableChannel.sendInternal(ExecutorSubscribableChannel.java:91)
    at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:117)
    at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:104)
    at org.springframework.messaging.simp.SimpMessagingTemplate.sendInternal(SimpMessagingTemplate.java:184)
    at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:159)
    at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
    at org.springframework.messaging.simp.user.UserDestinationMessageHandler.handleMessage(UserDestinationMessageHandler.java:219)
    at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:135)
    at org.springframework.messaging.support.ExecutorSubscribableChannel.sendInternal(ExecutorSubscribableChannel.java:91)
    at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:117)
    at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:104)
    at org.springframework.messaging.simp.SimpMessagingTemplate.sendInternal(SimpMessagingTemplate.java:184)
    at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:159)
    at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143)
    at org.springframework.messaging.simp.SimpMessagingTemplate.convertAndSendToUser(SimpMessagingTemplate.java:224)
    at org.springframework.messaging.simp.SimpMessagingTemplate.convertAndSendToUser(SimpMessagingTemplate.java:215)
    at org.springframework.messaging.simp.SimpMessagingTemplate.convertAndSendToUser(SimpMessagingTemplate.java:201)
    at com.slagalica.configuration.StateMachineConfig.execute(StateMachineConfig.java:190)
    at org.springframework.statemachine.state.ObjectState.exit(ObjectState.java:131)
    at org.springframework.statemachine.support.AbstractStateMachine.exitFromState(AbstractStateMachine.java:1036)
    at org.springframework.statemachine.support.AbstractStateMachine.exitCurrentState(AbstractStateMachine.java:991)
    at org.springframework.statemachine.support.AbstractStateMachine.setCurrentState(AbstractStateMachine.java:868)
    at org.springframework.statemachine.support.AbstractStateMachine.switchToState(AbstractStateMachine.java:752)
    at org.springframework.statemachine.support.AbstractStateMachine.access0(AbstractStateMachine.java:72)
    at org.springframework.statemachine.support.AbstractStateMachine.transit(AbstractStateMachine.java:293)
    at org.springframework.statemachine.support.DefaultStateMachineExecutor.handleTriggerTrans(DefaultStateMachineExecutor.java:213)
    at org.springframework.statemachine.support.DefaultStateMachineExecutor.processTriggerQueue(DefaultStateMachineExecutor.java:356)
    at org.springframework.statemachine.support.DefaultStateMachineExecutor.access0(DefaultStateMachineExecutor.java:57)
    at org.springframework.statemachine.support.DefaultStateMachineExecutor.run(DefaultStateMachineExecutor.java:242)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.statemachine.support.DefaultStateMachineExecutor.scheduleEventQueueProcessing(DefaultStateMachineExecutor.java:261)
    at org.springframework.statemachine.support.DefaultStateMachineExecutor.access0(DefaultStateMachineExecutor.java:57)
    at org.springframework.statemachine.support.DefaultStateMachineExecutor.run(DefaultStateMachineExecutor.java:255)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.statemachine.support.DefaultStateMachineExecutor.scheduleEventQueueProcessing(DefaultStateMachineExecutor.java:261)
    at org.springframework.statemachine.support.DefaultStateMachineExecutor.access0(DefaultStateMachineExecutor.java:57)
    at org.springframework.statemachine.support.DefaultStateMachineExecutor.triggered(DefaultStateMachineExecutor.java:423)
    at org.springframework.statemachine.trigger.CompositeTriggerListener.triggered(CompositeTriggerListener.java:34)
    at org.springframework.statemachine.trigger.TimerTrigger.notifyTriggered(TimerTrigger.java:115)
    at org.springframework.statemachine.trigger.TimerTrigger.access[=12=]0(TimerTrigger.java:33)
    at org.springframework.statemachine.trigger.TimerTrigger.run(TimerTrigger.java:109)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException: null
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2151)
    at reactor.rx.Promise.await(Promise.java:465)
    at reactor.rx.Promise.await(Promise.java:440)
    at org.springframework.messaging.tcp.reactor.AbstractPromiseToListenableFutureAdapter.get(AbstractPromiseToListenableFutureAdapter.java:76)
    at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$SystemStompConnectionHandler.forward(StompBrokerRelayMessageHandler.java:946)
    ... 56 common frames omitted

State changed to EQUATIONS_ENDED

感谢您的解释(这是一个很好的收获),我为此 https://github.com/spring-projects/spring-statemachine/issues/233 创建了 gh 问题。

简而言之,我们需要提供更好的用户级挂钩,以允许超时退出操作可以 运行 在继续之前。