将字符串转换为数组再转换为 Observable 中的对象
Convert String to Array to Objects in Observable
我正在尝试使用 CloseableHttpAsyncClient
从端点读取,将字符串编组为对象(使用 javax.json),然后将对象上的数组转换为它的各个组件:
CloseableHttpAsyncClient client = HttpAsyncClientBuilder.create().setDefaultCredentialsProvider(provider).build();
client.start();
Observable<ObservableHttpResponse> observable = ObservableHttp.createRequest(HttpAsyncMethods.createGet(uri), client)
.toObservable();
Observable<JsonArray> shareable = observable.flatMap(response -> response.getContent().map(bb -> {
String stringVal = new String(bb);
StringReader reader = new StringReader(stringVal);
JsonObject jobj = Json.createReader(reader).readObject();
return jobj.getJsonArray("elements");
})).share();
我需要得到Json数组,然后过滤数组的对象:
Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1"));
Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2"));
Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3"));
如何将 Observable<JsonArray>
转换为 ObservableJsonObject>
?
因为它是异步的,所以我不能使用 forEach 创建某种数组来缓冲数据。
更新:
所以考虑使用 CloseableHttpAsyncClient 可能不是我想要实现的最佳解决方案 - 我今天早上意识到(在所有事情的淋浴间)我正在尝试异步处理数据然后进行异步调用。
理想情况下,调用 CloseableHttpClient(同步)并将数据传递给 Observable 进行过滤将是一种更理想的方法(我不需要第一次调用来管理多个 http 调用)。
CloseableHttpClient client = HttpClientBuilder.create().setDefaultCredentialsProvider(provider).build();
StringBuffer result = new StringBuffer();
try {
HttpGet request = new HttpGet(uri);
HttpResponse response = client.execute(request);
BufferedReader rd = new BufferedReader(
new InputStreamReader(response.getEntity().getContent()));
String line;
while ((line = rd.readLine()) != null) {
result.append(line);
}
} catch(ClientProtocolException cpe) { } catch(IOException ioe) { }
StringReader reader = new StringReader(result.toString());
JsonObject jobj = Json.createReader(reader).readObject();
JsonArray elements = jobj.getJsonArray("elements");
List<JsonObject> objects = elements.getValuesAs(JsonObject.class);
Observable<JsonObject> shareable = Observable.from(objects).share();
Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1"));
Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2"));
Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3"));
firstStream.subscribe(record -> {
//connect to SOTS/Facebook and store the results
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Json.createWriter(baos).writeObject(record);
System.out.println(baos.toString());
});
secondStream.subscribe(record -> {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Json.createWriter(baos).writeObject(record);
System.out.println(baos.toString());
});
thirdStream.subscribe(record -> {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Json.createWriter(baos).writeObject(record);
System.out.println(baos.toString());
});
您应该可以使用另一个 flatMap()
调用来代替您正在使用的 map()
调用。然后使用 Observable.create()
发出 JsonObject
s
Observable<JsonObject> shareable =
observable
.flatMap(response -> response.getContent()
.flatMap(bb -> {
String stringVal = new String(bb);
StringReader reader = new StringReader(stringVal);
JsonObject jobj = Json.createReader(reader).readObject();
JsonArray elements = jobj.getJsonArray("elements");
return Observable.create(subscriber -> {
for (int i = 0; i < elements.length(); i++) {
subscriber.onNext(elements.getJSONObject(i));
}
subscriber.onCompleted();
});
}))
.share();
试试这个代码:
String myjson = "{\"elements\": [{\"text\":\"Obj1\"},{\"text\":\"Obj2\"}, {\"text\":\"Obj3\"}]}";
Observable.just(myjson)
.map(jsonStr -> new StringReader(myjson))
.map(reader -> Json.createReader(reader).readObject())
.map(jobj -> jobj.getJsonArray("elements"))
.map(elements -> elements.toArray(new JsonObject[elements.size()]))
.flatMap(jsonObjects -> Observable.from(jsonObjects))
.subscribe(
(jsonObject) -> System.out.println(jsonObject.getString("text")),
throwable -> throwable.printStackTrace(),
() -> System.out.println("On complete"));
结果:
07-22 12:19:22.362 8032-8032/com.mediamanagment.app I/System.out﹕
Obj1
07-22 12:19:22.362 8032-8032/com.mediamanagment.app
I/System.out﹕ Obj2
07-22 12:19:22.362 8032-8032/com.mediamanagment.app I/System.out﹕ Obj3
注意:
您应该使用此依赖项:
compile 'org.glassfish:javax.json:1.0.4'
改为:
compile 'javax.json:javax.json-api:1.0'
如果您将使用 'javax.json:javax.json-api:1.0'
,您将在以下步骤获得 javax.json.JsonException: Provider org.glassfish.json.JsonProviderImpl not found
:
.map(reader -> Json.createReader(reader).readObject())
因此,请使用'org.glassfish:javax.json:1.0.4'
更新:
另外,而不是
.flatMap(jsonObjects -> Observable.from(jsonObjects))
您可以使用 flatMapIterable( )
:
.flatMapIterable(jsonObjects -> jsonObjects)
我正在尝试使用 CloseableHttpAsyncClient
从端点读取,将字符串编组为对象(使用 javax.json),然后将对象上的数组转换为它的各个组件:
CloseableHttpAsyncClient client = HttpAsyncClientBuilder.create().setDefaultCredentialsProvider(provider).build();
client.start();
Observable<ObservableHttpResponse> observable = ObservableHttp.createRequest(HttpAsyncMethods.createGet(uri), client)
.toObservable();
Observable<JsonArray> shareable = observable.flatMap(response -> response.getContent().map(bb -> {
String stringVal = new String(bb);
StringReader reader = new StringReader(stringVal);
JsonObject jobj = Json.createReader(reader).readObject();
return jobj.getJsonArray("elements");
})).share();
我需要得到Json数组,然后过滤数组的对象:
Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1"));
Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2"));
Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3"));
如何将 Observable<JsonArray>
转换为 ObservableJsonObject>
?
因为它是异步的,所以我不能使用 forEach 创建某种数组来缓冲数据。
更新:
所以考虑使用 CloseableHttpAsyncClient 可能不是我想要实现的最佳解决方案 - 我今天早上意识到(在所有事情的淋浴间)我正在尝试异步处理数据然后进行异步调用。
理想情况下,调用 CloseableHttpClient(同步)并将数据传递给 Observable 进行过滤将是一种更理想的方法(我不需要第一次调用来管理多个 http 调用)。
CloseableHttpClient client = HttpClientBuilder.create().setDefaultCredentialsProvider(provider).build();
StringBuffer result = new StringBuffer();
try {
HttpGet request = new HttpGet(uri);
HttpResponse response = client.execute(request);
BufferedReader rd = new BufferedReader(
new InputStreamReader(response.getEntity().getContent()));
String line;
while ((line = rd.readLine()) != null) {
result.append(line);
}
} catch(ClientProtocolException cpe) { } catch(IOException ioe) { }
StringReader reader = new StringReader(result.toString());
JsonObject jobj = Json.createReader(reader).readObject();
JsonArray elements = jobj.getJsonArray("elements");
List<JsonObject> objects = elements.getValuesAs(JsonObject.class);
Observable<JsonObject> shareable = Observable.from(objects).share();
Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1"));
Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2"));
Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3"));
firstStream.subscribe(record -> {
//connect to SOTS/Facebook and store the results
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Json.createWriter(baos).writeObject(record);
System.out.println(baos.toString());
});
secondStream.subscribe(record -> {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Json.createWriter(baos).writeObject(record);
System.out.println(baos.toString());
});
thirdStream.subscribe(record -> {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Json.createWriter(baos).writeObject(record);
System.out.println(baos.toString());
});
您应该可以使用另一个 flatMap()
调用来代替您正在使用的 map()
调用。然后使用 Observable.create()
发出 JsonObject
s
Observable<JsonObject> shareable =
observable
.flatMap(response -> response.getContent()
.flatMap(bb -> {
String stringVal = new String(bb);
StringReader reader = new StringReader(stringVal);
JsonObject jobj = Json.createReader(reader).readObject();
JsonArray elements = jobj.getJsonArray("elements");
return Observable.create(subscriber -> {
for (int i = 0; i < elements.length(); i++) {
subscriber.onNext(elements.getJSONObject(i));
}
subscriber.onCompleted();
});
}))
.share();
试试这个代码:
String myjson = "{\"elements\": [{\"text\":\"Obj1\"},{\"text\":\"Obj2\"}, {\"text\":\"Obj3\"}]}";
Observable.just(myjson)
.map(jsonStr -> new StringReader(myjson))
.map(reader -> Json.createReader(reader).readObject())
.map(jobj -> jobj.getJsonArray("elements"))
.map(elements -> elements.toArray(new JsonObject[elements.size()]))
.flatMap(jsonObjects -> Observable.from(jsonObjects))
.subscribe(
(jsonObject) -> System.out.println(jsonObject.getString("text")),
throwable -> throwable.printStackTrace(),
() -> System.out.println("On complete"));
结果:
07-22 12:19:22.362 8032-8032/com.mediamanagment.app I/System.out﹕ Obj1
07-22 12:19:22.362 8032-8032/com.mediamanagment.app I/System.out﹕ Obj2
07-22 12:19:22.362 8032-8032/com.mediamanagment.app I/System.out﹕ Obj3
注意:
您应该使用此依赖项:
compile 'org.glassfish:javax.json:1.0.4'
改为:
compile 'javax.json:javax.json-api:1.0'
如果您将使用 'javax.json:javax.json-api:1.0'
,您将在以下步骤获得 javax.json.JsonException: Provider org.glassfish.json.JsonProviderImpl not found
:
.map(reader -> Json.createReader(reader).readObject())
因此,请使用'org.glassfish:javax.json:1.0.4'
更新: 另外,而不是
.flatMap(jsonObjects -> Observable.from(jsonObjects))
您可以使用 flatMapIterable( )
:
.flatMapIterable(jsonObjects -> jsonObjects)