stomp客户端连接后添加topic/subscription
Add topic/subscription after stomp client is connected
我目前正在使用
https://github.com/NaikSoftware/StompProtocolAndroid
使用 STOMP 连接 websocket。我有一个简单的实现
public class TestActivity extends AppCompatActivity {
private StompClient mStompCLient;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_test);
ButterKnife.bind(this);
setSupportActionBar(toolbar);
mStompCLient = Stomp.over(WebSocket.class, BASE_URL);
mStompCLient.topic("/topic/online/" + mSharedPreferences.getPrivateKey()).subscribe(new Subscriber<StompMessage>() {
@Override
public void onCompleted() {
Log.i(TAG, "/topic/online/ onCompleted: ");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "/topic/online/ onError: " + e.getMessage());
}
@Override
public void onNext(StompMessage stompMessage) {
Log.d(TAG, "/topic/online/ onNext: " + stompMessage.getPayload());
String content = "";
JSONObject jsonResponse = null;
try {
jsonResponse = new JSONObject(stompMessage.getPayload());
content = jsonResponse.getString("uri");
} catch (JSONException e) {
e.printStackTrace();
}
listenToUpdatesFromFinalUri(content);
}
});
mStompCLient.lifecycle().subscribe(lifecycleEvent -> {
Log.i(TAG, "onCreate: " + lifecycleEvent.getMessage());
switch (lifecycleEvent.getType()) {
case OPENED:
Log.d(TAG, "Stomp connection opened");
break;
case ERROR:
Log.e(TAG, "Error", lifecycleEvent.getException());
break;
case CLOSED:
Log.d(TAG, "Stomp connection closed : " + lifecycleEvent.toString() + " :msg: " + lifecycleEvent.getMessage() + " :escep: " + lifecycleEvent.getException() + " :headers: " + lifecycleEvent.getHandshakeResponseHeaders() + " :type: " + lifecycleEvent.getType());
break;
}
});
mStompCLient.connect();
}
private void listenToUpdatesFromFinalUri(String content) {
mStompCLient.topic(content).subscribe(new Subscriber<StompMessage>() {
@Override
public void onCompleted() {
Log.i(TAG," onCompleted: ");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, " onError: " + e.getMessage());
}
@Override
public void onNext(StompMessage stompMessage) {
Log.d(TAG, " onNext: " + stompMessage.getPayload());
}
});
}
@Override
protected void onStop() {
super.onStop();
disconnectStomp();
}
private void disconnectStomp() {
mStompCLient.disconnect();
}
}
这里我尝试监听建立连接后服务器发送的新订阅频道。如果在调用 connect 之前调用 subscribe() ,它会起作用。但是在 listenToUpdatesFromFinalUri() 函数中订阅的最终 uri/subscription 频道不是静态的,所以我需要在连接之前不能添加订阅。我目前无法收到最终 uri/subscription 的回复。感谢任何帮助。
问题已在新版本1.1.6中解决
我目前正在使用 https://github.com/NaikSoftware/StompProtocolAndroid 使用 STOMP 连接 websocket。我有一个简单的实现
public class TestActivity extends AppCompatActivity {
private StompClient mStompCLient;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_test);
ButterKnife.bind(this);
setSupportActionBar(toolbar);
mStompCLient = Stomp.over(WebSocket.class, BASE_URL);
mStompCLient.topic("/topic/online/" + mSharedPreferences.getPrivateKey()).subscribe(new Subscriber<StompMessage>() {
@Override
public void onCompleted() {
Log.i(TAG, "/topic/online/ onCompleted: ");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "/topic/online/ onError: " + e.getMessage());
}
@Override
public void onNext(StompMessage stompMessage) {
Log.d(TAG, "/topic/online/ onNext: " + stompMessage.getPayload());
String content = "";
JSONObject jsonResponse = null;
try {
jsonResponse = new JSONObject(stompMessage.getPayload());
content = jsonResponse.getString("uri");
} catch (JSONException e) {
e.printStackTrace();
}
listenToUpdatesFromFinalUri(content);
}
});
mStompCLient.lifecycle().subscribe(lifecycleEvent -> {
Log.i(TAG, "onCreate: " + lifecycleEvent.getMessage());
switch (lifecycleEvent.getType()) {
case OPENED:
Log.d(TAG, "Stomp connection opened");
break;
case ERROR:
Log.e(TAG, "Error", lifecycleEvent.getException());
break;
case CLOSED:
Log.d(TAG, "Stomp connection closed : " + lifecycleEvent.toString() + " :msg: " + lifecycleEvent.getMessage() + " :escep: " + lifecycleEvent.getException() + " :headers: " + lifecycleEvent.getHandshakeResponseHeaders() + " :type: " + lifecycleEvent.getType());
break;
}
});
mStompCLient.connect();
}
private void listenToUpdatesFromFinalUri(String content) {
mStompCLient.topic(content).subscribe(new Subscriber<StompMessage>() {
@Override
public void onCompleted() {
Log.i(TAG," onCompleted: ");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, " onError: " + e.getMessage());
}
@Override
public void onNext(StompMessage stompMessage) {
Log.d(TAG, " onNext: " + stompMessage.getPayload());
}
});
}
@Override
protected void onStop() {
super.onStop();
disconnectStomp();
}
private void disconnectStomp() {
mStompCLient.disconnect();
}
}
这里我尝试监听建立连接后服务器发送的新订阅频道。如果在调用 connect 之前调用 subscribe() ,它会起作用。但是在 listenToUpdatesFromFinalUri() 函数中订阅的最终 uri/subscription 频道不是静态的,所以我需要在连接之前不能添加订阅。我目前无法收到最终 uri/subscription 的回复。感谢任何帮助。
问题已在新版本1.1.6中解决