Apache Camel SFTP 获取文件内容为空

Apache Camel SFTP get file content null

Java代码


@Component
@RequiredArgsConstructor
public class WireInboundFileListener extends RouteBuilder {

    private final JwtWireTokenService jwtWireTokenService;

    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    @Value("${wire.ftp.protocol}")
    private String ftpProtocol;

    @Value("${wire.ftp.server}")
    private String ftpServer;

    @Value("${wire.ftp.server.port}")
    private String ftpServerPort;

    @Value("${wire.ftp.username}")
    private String ftpUsername;

    @Value("${wire.ftp.password}")
    private String ftpPassword;

    @Value("${wire.ftp.private.key.file}")
    private String ftpPrivateKeyFile;

    @Value("${wire.ftp.private.key.passphrase}")
    private String privateKeyPassphrase;

    @Value("${wire.file.inbound.dir}")
    private String ftpListenDir;

    @Value("${wire.file.inbound.url}")
    private String inboundUrl;

    @Override
    public void configure() {

        var fromFtpUri = String.format("%s:%s:%s/%s?username=%s&delete=true&antInclude=*.txt",
                ftpProtocol, ftpServer, ftpServerPort, ftpListenDir, ftpUsername);

        log.info("SFTP inbound listen dir : " + ftpListenDir);

        if (Environment.getExecutionEnvironment().equals(Environment.ExecutionEnvironment.AWS)) {
            fromFtpUri += "&privateKeyFile=" + ftpPrivateKeyFile + "&privateKeyPassphrase=" + privateKeyPassphrase;
        } else {
            fromFtpUri += "&password=" + ftpPassword;
        }

        from(fromFtpUri)
                //.delay(10000) event I put delay but still got file content null
                .convertBodyTo(String.class)
                .process(exchange -> {
                    final var requestBody = new HashMap<String, Object>();
                    final var content = exchange.getIn().getBody();
                    final var fileName = exchange.getIn().getHeader("CamelFileName");
                    requestBody.put("content", content);
                    requestBody.put("name", fileName);
                    exchange.getIn().setBody(OBJECT_MAPPER.writeValueAsString(requestBody));
                })
                .to("log:com.test.wire.inbound.listener.SftpRoute")
                .setHeader(Exchange.HTTP_METHOD, constant("POST"))
                .setHeader("Content-Type", constant("application/json"))
                .setHeader("Authorization", method(this, "clientToken"))
                .to(inboundUrl + "?throwExceptionOnFailure=false")
                .log("Response body from wire inbound : ${body}")
                .end();
    }

    public String clientToken() {
        return "Bearer " + jwtWireTokenService.getToken();
    }
}

请求成功

2022-04-20 03:46:47.910  INFO 1 --- [read #6 - Delay] c.test.wire.inbound.listener.SftpRoute  : Exchange[ExchangePattern: InOnly, BodyType: String, Body: {
    "name": "sample-inbound-transfer-20220414024722.txt",
    "content": "file content"
}]

请求失败

2022-04-21 09:36:54.148  INFO 1 --- [read #4 - Delay] c.test.wire.inbound.listener.SftpRoute  : Exchange[ExchangePattern: InOnly, BodyType: String, Body: {
    "name": "sample-inbound-transfer-20220414024722.txt",
    "content": ""
}]

主要问题

 final var content = exchange.getIn().getBody();// sometimes get null, sometimes can get file contents

当我测试将文件放到本地的 SFTP 服务器时,它按我在 Sucess Request 中的预期工作,因为上传过程似乎很快,因为它在本地 (FileZilla)。 但是当我测试再次将文件拖放到真实服务器中托管的 SFTP 服务器时,有时它可以工作,有时它不工作 Fail Request。似乎是 SFTP 消耗文件问题。你能帮我吗?谢谢

在文件上传完成之前,该文件可能(有时)被您的 Camel 路由消耗掉了。

您可以尝试将您的端点配置为仅当它对文件具有独占 read-lock 时才轮询文件(即文件不是 in-progress 或正在写入);请参阅 readLock 参数。

我觉得默认是没有读锁的;您应该设置 readLock=changed...但我不记得这种模式是否也适用于 SFTP 类型的端点。