java.io.BufferedReader().map 无法推断 <T> 的类型参数 fromStream(Stream<? extends T>)
java.io.BufferedReader().map Cannot infer type argument(s) for <T> fromStream(Stream<? extends T>)
场景:Spring WebFlux 触发 CommandLineRunner.run 以便将数据加载到 MongoDb 以进行测试。
目标:在本地启动微服务时,它旨在读取 json 文件并将文档加载到 MongDb。
个人知识:"bufferedReader.lines().filter(l -> !l.trim().isEmpty()" 读取每个 json 节点并将其 return 作为流。然后我可以将它映射到 "l" 并访问 get 方法。我想我不必创建一个列表然后流式传输它,因为我已经通过 "new InputStreamReader(getClass().getClassLoader().getResourceAsStream()" 将它作为流加载,并且我假设我可以使用 lines() 因为它的节点将产生一个字符串行。我是在正确的方向还是我搞砸了一些想法?
这是一个 json 示例文件:
{
"Extrato": {
"description": "credit",
"value": "R.000,00",
"status": 11
},
"Extrato": {
"description": "debit",
"value": "R.000,00",
"status": 99
}
}
型号
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
@Document
public class Extrato {
@Id
private String id;
private String description;
private String value;
private Integer status;
public Extrato(String id, String description, String value, Integer status) {
super();
this.id = id;
this.description = description;
this.value = value;
this.status = status;
}
... getters and setter accordinly
存储库
import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import com.noblockingcase.demo.model.Extrato;
import reactor.core.publisher.Flux;
import org.springframework.data.domain.Pageable;
public interface ExtratoRepository extends ReactiveCrudRepository<Extrato, String> {
@Query("{ id: { $exists: true }}")
Flux<Extrato> retrieveAllExtratosPaged(final Pageable page);
}
从json文件上方加载的命令
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import com.noblockingcase.demo.model.Extrato;
import com.noblockingcase.demo.repository.ExtratoRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
@Component
public class TestDataLoader implements CommandLineRunner {
private static final Logger log = LoggerFactory.getLogger(TestDataLoader.class);
private ExtratoRepository extratoRepository;
TestDataLoader(final ExtratoRepository extratoRepository) {
this.extratoRepository = extratoRepository;
}
@Override
public void run(final String... args) throws Exception {
if (extratoRepository.count().block() == 0L) {
final LongSupplier longSupplier = new LongSupplier() {
Long l = 0L;
@Override
public long getAsLong() {
return l++;
}
};
BufferedReader bufferedReader = new BufferedReader(
new InputStreamReader(getClass().getClassLoader().getResourceAsStream("carga-teste.txt")));
//*** THE ISSUE IS NEXT LINE
Flux.fromStream(bufferedReader.lines().filter(l -> !l.trim().isEmpty())
.map(l -> extratoRepository.save(new Extrato(String.valueOf(longSupplier.getAsLong()),
l.getDescription(), l.getValue(), l.getStatus()))))
.subscribe(m -> log.info("Carga Teste: {}", m.block()));
}
}
}
这是 MongoDb 配置,但我认为它不相关
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.mongodb.MongoClientOptions;
@Configuration
public class MongoDbSettings {
@Bean
public MongoClientOptions mongoOptions() {
return MongoClientOptions.builder().socketTimeout(2000).build();
}
}
如果我尝试我的原始代码并调整它以读取文本文件,我可以成功读取文本文件而不是 json。显然它不符合我的需求,因为我想读取 json 文件。顺便说一句,它可以更清楚地说明我被阻止的地方。
加载-test.txt(https://github.com/jimisdrpc/webflux-worth-scenarious/blob/master/demo/src/main/resources/carga-teste.txt可用)
crédito de R.000,00
débito de R0,00
使用简单文本文件的代码片段
BufferedReader bufferedReader = new BufferedReader(
new InputStreamReader(getClass().getClassLoader().getResourceAsStream("carga-teste.txt")));
Flux.fromStream(bufferedReader.lines().filter(l -> !l.trim().isEmpty())
.map(l -> extratoRepository
.save(new Extrato(String.valueOf(longSupplier.getAsLong()), "Qualquer descrição", l))))
.subscribe(m -> log.info("Carga Teste: {}", m.block()));
整个项目成功读取文本文件:https://github.com/jimisdrpc/webflux-worth-scenarious/tree/master/demo
Docker 为引导编写 MongoDb https://github.com/jimisdrpc/webflux-worth-scenarious/blob/master/docker-compose.yml
总而言之,我的问题是:在 CommandLineRunner.run()
期间,我没有弄清楚如何读取 json 文件并将数据插入 MongoDb
我找到了一个带有 Flux::using Flux::fromStream 的示例可以帮助实现此目的。这会将您的文件读入 Flux
,然后您可以订阅并使用 .flatmap
或其他内容进行处理。来自 Javadoc
using(Callable resourceSupplier, Function> sourceSupplier, Consumer resourceCleanup)
Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.
和我放在一起的代码:
private static Flux<Account> fluxAccounts() {
return Flux.using(() ->
new BufferedReader(new InputStreamReader(new ClassPathResource("data/ExportCSV.csv").getInputStream()))
.lines()
.map(s->{
String[] sa = s.split(" ");
return Account.builder()
.firstname(sa[0])
.lastname(sa[1])
.build();
}),
Flux::fromStream,
BaseStream::close
);
}
请注意您的 json 无效。文本数据与 json 不同。 Json 需要特殊处理,所以最好还是使用库。
carga-teste.json
[
{"description": "credit", "value": "R.000,00", "status": 11},
{"description": "debit","value": "R.000,00", "status": 99}
]
此处为文章致谢 - https://www.nurkiewicz.com/2017/09/streaming-large-json-file-with-jackson.html。
我已经开始使用 Flux。
@Override
public void run(final String... args) throws Exception {
BufferedReader bufferedReader = new BufferedReader(
new InputStreamReader(getClass().getClassLoader().getResourceAsStream("carga-teste.json")));
ObjectMapper mapper = new ObjectMapper();
Flux<Extrato> flux = Flux.generate(
() -> parser(bufferedReader, mapper),
this::pullOrComplete,
jsonParser -> {
try {
jsonParser.close();
} catch (IOException e) {}
});
flux.map(l -> extratoRepository.save(l)).subscribe(m -> log.info("Carga Teste: {}", m.block()));
}
}
private JsonParser parser(Reader reader, ObjectMapper mapper) {
JsonParser parser = null;
try {
parser = mapper.getFactory().createParser(reader);
parser.nextToken();
} catch (IOException e) {}
return parser;
}
private JsonParser pullOrComplete(JsonParser parser, SynchronousSink<Extrato> emitter) {
try {
if (parser.nextToken() != JsonToken.END_ARRAY) {
Extrato extrato = parser.readValueAs(Extrato.class);
emitter.next(extrato);
} else {
emitter.complete();
}
} catch (IOException e) {
emitter.error(e);
}
return parser;
}
场景:Spring WebFlux 触发 CommandLineRunner.run 以便将数据加载到 MongoDb 以进行测试。
目标:在本地启动微服务时,它旨在读取 json 文件并将文档加载到 MongDb。
个人知识:"bufferedReader.lines().filter(l -> !l.trim().isEmpty()" 读取每个 json 节点并将其 return 作为流。然后我可以将它映射到 "l" 并访问 get 方法。我想我不必创建一个列表然后流式传输它,因为我已经通过 "new InputStreamReader(getClass().getClassLoader().getResourceAsStream()" 将它作为流加载,并且我假设我可以使用 lines() 因为它的节点将产生一个字符串行。我是在正确的方向还是我搞砸了一些想法?
这是一个 json 示例文件:
{
"Extrato": {
"description": "credit",
"value": "R.000,00",
"status": 11
},
"Extrato": {
"description": "debit",
"value": "R.000,00",
"status": 99
}
}
型号
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
@Document
public class Extrato {
@Id
private String id;
private String description;
private String value;
private Integer status;
public Extrato(String id, String description, String value, Integer status) {
super();
this.id = id;
this.description = description;
this.value = value;
this.status = status;
}
... getters and setter accordinly
存储库
import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import com.noblockingcase.demo.model.Extrato;
import reactor.core.publisher.Flux;
import org.springframework.data.domain.Pageable;
public interface ExtratoRepository extends ReactiveCrudRepository<Extrato, String> {
@Query("{ id: { $exists: true }}")
Flux<Extrato> retrieveAllExtratosPaged(final Pageable page);
}
从json文件上方加载的命令
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import com.noblockingcase.demo.model.Extrato;
import com.noblockingcase.demo.repository.ExtratoRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
@Component
public class TestDataLoader implements CommandLineRunner {
private static final Logger log = LoggerFactory.getLogger(TestDataLoader.class);
private ExtratoRepository extratoRepository;
TestDataLoader(final ExtratoRepository extratoRepository) {
this.extratoRepository = extratoRepository;
}
@Override
public void run(final String... args) throws Exception {
if (extratoRepository.count().block() == 0L) {
final LongSupplier longSupplier = new LongSupplier() {
Long l = 0L;
@Override
public long getAsLong() {
return l++;
}
};
BufferedReader bufferedReader = new BufferedReader(
new InputStreamReader(getClass().getClassLoader().getResourceAsStream("carga-teste.txt")));
//*** THE ISSUE IS NEXT LINE
Flux.fromStream(bufferedReader.lines().filter(l -> !l.trim().isEmpty())
.map(l -> extratoRepository.save(new Extrato(String.valueOf(longSupplier.getAsLong()),
l.getDescription(), l.getValue(), l.getStatus()))))
.subscribe(m -> log.info("Carga Teste: {}", m.block()));
}
}
}
这是 MongoDb 配置,但我认为它不相关
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.mongodb.MongoClientOptions;
@Configuration
public class MongoDbSettings {
@Bean
public MongoClientOptions mongoOptions() {
return MongoClientOptions.builder().socketTimeout(2000).build();
}
}
如果我尝试我的原始代码并调整它以读取文本文件,我可以成功读取文本文件而不是 json。显然它不符合我的需求,因为我想读取 json 文件。顺便说一句,它可以更清楚地说明我被阻止的地方。
加载-test.txt(https://github.com/jimisdrpc/webflux-worth-scenarious/blob/master/demo/src/main/resources/carga-teste.txt可用)
crédito de R.000,00
débito de R0,00
使用简单文本文件的代码片段
BufferedReader bufferedReader = new BufferedReader(
new InputStreamReader(getClass().getClassLoader().getResourceAsStream("carga-teste.txt")));
Flux.fromStream(bufferedReader.lines().filter(l -> !l.trim().isEmpty())
.map(l -> extratoRepository
.save(new Extrato(String.valueOf(longSupplier.getAsLong()), "Qualquer descrição", l))))
.subscribe(m -> log.info("Carga Teste: {}", m.block()));
整个项目成功读取文本文件:https://github.com/jimisdrpc/webflux-worth-scenarious/tree/master/demo
Docker 为引导编写 MongoDb https://github.com/jimisdrpc/webflux-worth-scenarious/blob/master/docker-compose.yml
总而言之,我的问题是:在 CommandLineRunner.run()
期间,我没有弄清楚如何读取 json 文件并将数据插入 MongoDb我找到了一个带有 Flux::using Flux::fromStream 的示例可以帮助实现此目的。这会将您的文件读入 Flux
,然后您可以订阅并使用 .flatmap
或其他内容进行处理。来自 Javadoc
using(Callable resourceSupplier, Function> sourceSupplier, Consumer resourceCleanup) Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.
和我放在一起的代码:
private static Flux<Account> fluxAccounts() {
return Flux.using(() ->
new BufferedReader(new InputStreamReader(new ClassPathResource("data/ExportCSV.csv").getInputStream()))
.lines()
.map(s->{
String[] sa = s.split(" ");
return Account.builder()
.firstname(sa[0])
.lastname(sa[1])
.build();
}),
Flux::fromStream,
BaseStream::close
);
}
请注意您的 json 无效。文本数据与 json 不同。 Json 需要特殊处理,所以最好还是使用库。
carga-teste.json
[
{"description": "credit", "value": "R.000,00", "status": 11},
{"description": "debit","value": "R.000,00", "status": 99}
]
此处为文章致谢 - https://www.nurkiewicz.com/2017/09/streaming-large-json-file-with-jackson.html。
我已经开始使用 Flux。
@Override
public void run(final String... args) throws Exception {
BufferedReader bufferedReader = new BufferedReader(
new InputStreamReader(getClass().getClassLoader().getResourceAsStream("carga-teste.json")));
ObjectMapper mapper = new ObjectMapper();
Flux<Extrato> flux = Flux.generate(
() -> parser(bufferedReader, mapper),
this::pullOrComplete,
jsonParser -> {
try {
jsonParser.close();
} catch (IOException e) {}
});
flux.map(l -> extratoRepository.save(l)).subscribe(m -> log.info("Carga Teste: {}", m.block()));
}
}
private JsonParser parser(Reader reader, ObjectMapper mapper) {
JsonParser parser = null;
try {
parser = mapper.getFactory().createParser(reader);
parser.nextToken();
} catch (IOException e) {}
return parser;
}
private JsonParser pullOrComplete(JsonParser parser, SynchronousSink<Extrato> emitter) {
try {
if (parser.nextToken() != JsonToken.END_ARRAY) {
Extrato extrato = parser.readValueAs(Extrato.class);
emitter.next(extrato);
} else {
emitter.complete();
}
} catch (IOException e) {
emitter.error(e);
}
return parser;
}