序列号为 #1 的 FIX 消息太低(预期序列号为 #2)

FIX Message with sequence #1 is too low (expected sequence #2)

我正在使用 QuickFixJ 将 Bloomberg 上的消息 post 发送到具有不同 IP 的会话。我可以连接两个会话,但是当我尝试发送消息时,出现错误:

收到注销请求:消息序列#1 太低(预期序列#2)

我的配置文件:

[default]
FileStorePath=target/data/quickfixlogs
FileLogPath=target/data/quickfixlogs
ConnectionType=initiator
BeginString=FIXT.1.1
SenderCompID=****
TargetCompID=****

[session]
BeginString=FIXT.1.1
SenderCompID=****
TargetCompID=*****
DefaultApplVerID=FIX.5.0SP2
SessionQualifier=****
StartTime=00:00:00
EndTime=00:00:00
HeartBtInt=10
ResetSeqNumFlag=Y
ReconnectInterval=5
UseDataDictionary=Y
DataDictionary=FIXT11.xml
SocketConnectPort=8228
SocketConnectHost=69.191.198.38
SocketKeyStore=*****
SocketKeyStorePassword=*****
SocketUseSSL=Y
EnabledProtocols=TLSv1.2
KeyStoreType=JKS

[session]
BeginString=FIXT.1.1
SenderCompID=*****
TargetCompID=*****
DefaultApplVerID=FIX.5.0SP2
SessionQualifier=*****
StartTime=00:00:00
EndTime=00:00:00
HeartBtInt=10
ResetSeqNumFlag=Y
ReconnectInterval=5
UseDataDictionary=Y
DataDictionary=FIXT11.xml
SocketConnectPort=8228
SocketConnectHost=69.191.230.38
SocketKeyStore=*****
SocketKeyStorePassword=*****
SocketUseSSL=Y
EnabledProtocols=TLSv1.2
KeyStoreType=JKS

我已经尝试设置 ResetSeqNumFlag=Y/N 但我仍然收到相同的错误消息。

Bloomberg 的文档指出:

Bloomberg 遵守 ResetSeqNumFlag(141) 的标准用法以维持 24 小时会话 根据“FIX Transport 1.1”规范(https://www.fixtrading.org/standards/fixt/)。以下 是 FIXT1.1 规范的摘录: 建议每 24 小时内建立一次新的 FIX 会话。这是 可以保持 24 小时连接并建立一组新的序列号 发送设置了 ResetSeqNumFlag 的登录消息。 当使用 ResetSeqNumFlag 保持 24 小时连接并建立一组新的 序列号,过程应该如下。双方应就重置时间达成一致 以及将成为流程发起方的一方。请注意 ResetSeqNum 的发起者 process 可能与 Logon process 的发起者不同。一方将启动该过程 通过发送 TestRequest 并等待 Heartbeat 响应以确保没有序列号 差距。一旦收到心跳,发起者应该发送一个登录 ResetSeqNumFlag 设置为 Y 且 MsgSeqNum 为 1。接受者应响应 将 ResetSeqNumFlag 设置为 Y 并将 MsgSeqNum 设置为 1 进行登录。此时新消息 来自任何一方的 MsgSeqNum 应该继续为 2。应该注意的是,一旦 发起者发送设置了 ResetSeqNumFlag 的登录,接受者必须服从此请求 “昨天”发送的最后一个序列号的消息可能不再是 可用的。如果此过程是,则应关闭连接并采取手动干预 发起但未正确遵循。

但是我看不到如何发送TestRequest并等待HeartBeat/只有在收到testrequest时才发送心跳以同步序列号。

我的申请代码如下:

public class QuickFixJApplication implements Application {
    @Override
    public void onCreate(SessionID sessionID) {
    }

    @Override
    public void onLogon(SessionID sessionID) {
   
    }

    @Override
    public void onLogout(SessionID sessionID) {

    }

    @Override
    public void toAdmin(Message message, SessionID sessionID) {

    }

    @Override
    public void fromAdmin(Message message, SessionID sessionID) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon {

    }

    @Override
    public void toApp(Message message, SessionID sessionID) throws DoNotSend {

    }

    @Override
    public void fromApp(Message message, SessionID sessionID) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {

    }
}

我用来配置会话的代码:

public class FixConfiguration {

    public static final String quickfixj = "quickfixj.cfg";

    public ThreadedSocketInitiator threadedSocketInitiator(QuickFixJApplication application, String fileName){
        ThreadedSocketInitiator threadedSocketInitiator = null;
        try {
            SessionSettings settings = new SessionSettings(new FileInputStream(fileName));
            MessageStoreFactory storeFactory = new FileStoreFactory(settings);
            LogFactory logFactory = new FileLogFactory(settings);
            MessageFactory messageFactory = new DefaultMessageFactory();
            threadedSocketInitiator = new ThreadedSocketInitiator(application, storeFactory, settings, logFactory, messageFactory);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return threadedSocketInitiator;
    }
}

我用来发布价格的代码:

public class BloombergPricePublisher {

    public ThreadedSocketInitiator threadedSocketInitiator;
    public QuickFixJApplication application;

    /**
     * Constructeur de BloombergPricePublisher qui prend en paramètres :
     * @param application : l'application QuickFixJ initialisée
     * */
    public BloombergPricePublisher(QuickFixJApplication application, ThreadedSocketInitiator threadedSocketInitiator){
        super();
        this.application = application;
        this.threadedSocketInitiator = threadedSocketInitiator;
    }

    /**
     * Méthode de publication d'un prix sur Bloomberg
     * */
    public void publish(MarketPrice marketPrice) {

        MarketDataIncrementalRefresh marketDataIncrementalRefresh = createMarketDataIncrementalRefresh(marketPrice);

        try{
            for (SessionID sessionID : this.threadedSocketInitiator.getSessions()){

                    Session.sendToTarget(marketDataIncrementalRefresh, sessionID);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    /**
     * Méthode de création d'un nouveau prix pour le publier sur Bloomberg
     * @param marketPrice
     * */
    public MarketDataIncrementalRefresh createMarketDataIncrementalRefresh(MarketPrice marketPrice) {

        quickfix.fix50sp2.MarketDataIncrementalRefresh message = new MarketDataIncrementalRefresh();

        quickfix.fix50sp2.MarketDataIncrementalRefresh.NoMDEntries group = new MarketDataIncrementalRefresh.NoMDEntries();

        /**
         * Ajout prix Bid au message
         * */
        group.set(new MDUpdateAction('1'));
        group.set(new MDEntryType('0'));
        group.set(new MDEntryPx(marketPrice.getBid_kech()));
        group.set(new Symbol(marketPrice.getCUSIP()));
        message.addGroup(group);

        /**
         * Ajout prix Ask au message
         * */
        group.set(new MDUpdateAction('1'));
        group.set(new MDEntryType('1'));
        group.set(new MDEntryPx(marketPrice.getAsk_kech()));
        group.set(new Symbol(marketPrice.getCUSIP()));
        message.addGroup(group);

        return message;

    }
}

谁能帮我解决这个问题? 非常感谢!

这个配置文件解决了这个问题:

[default]
FileStorePath=target/data/quickfix/messages
FileLogPath=target/data/quickfix/logs
ConnectionType=initiator
BeginString=FIXT.1.1
SenderCompID=*****
TargetCompID=*****

[session]
BeginString=FIXT.1.1
SenderCompID=*****
TargetCompID=*****
DefaultApplVerID=FIX.5.0SP2
SessionQualifier=*****
ResetSeqNumFlag=Y
ResetOnLogon=Y
StartTime=00:00:00
EndTime=00:00:00
TimeZone=America/New_York
StartDay=Monday
EndDay=Monday
HeartBtInt=30
ReconnectInterval=5
SocketConnectPort=8228
SocketConnectHost=69.191.198.38
SocketKeyStore=/home/******
SocketKeyStorePassword=****
SocketUseSSL=Y
EnabledProtocols=TLSv1.2
KeyStoreType=JKS

[session]
BeginString=FIXT.1.1
SenderCompID=*****
TargetCompID=*****
DefaultApplVerID=FIX.5.0SP2
SessionQualifier=*****
ResetSeqNumFlag=Y
ResetOnLogon=Y
StartTime=00:00:00
EndTime=00:00:00
TimeZone=America/New_York
StartDay=Monday
EndDay=Monday
HeartBtInt=30
ReconnectInterval=5
SocketConnectPort=8228
SocketConnectHost=69.191.230.38
SocketKeyStore=/home/*****
SocketKeyStorePassword=*****
SocketUseSSL=Y
EnabledProtocols=TLSv1.2
KeyStoreType=JKS