NiFi ExecuteScript 输出使用 zip4j 损坏

NiFi ExecuteScript output is corrupted using zip4j

@Grab('net.lingala.zip4j:zip4j:2.2.8')
import net.lingala.zip4j.io.outputstream.ZipOutputStream;
import net.lingala.zip4j.model.ZipParameters;
import net.lingala.zip4j.model.enums.EncryptionMethod;

import org.apache.commons.io.IOUtils

flowFile = session.get()
if(!flowFile) return

flowFile = session.write(flowFile, {inputStream, outputStream ->
    byte[] inputByteArray = IOUtils.toByteArray(inputStream)

    ByteArrayOutputStream outputByteStream = new ByteArrayOutputStream()
    ZipOutputStream outputZipStream = new ZipOutputStream(outputByteStream, "password".toCharArray())

    //init the zip parameters
    ZipParameters zipParams = new ZipParameters()
    zipParams.setEncryptFiles(true)
    zipParams.setEncryptionMethod(EncryptionMethod.AES)
    zipParams.setFileNameInZip("records.csv")

    outputZipStream.putNextEntry(zipParams)
    outputZipStream.write(inputByteArray)
    outputZipStream.closeEntry()

    outputZipStream.close()
    outputByteStream.close()

    outputStream.write(outputByteStream.toByteArray())
} as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)

我正在使用 groovy 中编写的 executescript 使用密码压缩 csv。 executescript 处理器能够毫无问题地将流文件传递给下一个处理器,但该文件已损坏。 PutSFTP 处理器抱怨以下错误。

尽管如此,该文件还是通过 SFTP 存储的,但我无法将其解压缩,因为它已损坏。我的代码有什么问题?

在开发自定义处理器以解压缩加密内容时,我们已成功使用以下代码片段使用 Lingala 解压缩受密码保护的流文件。请参考以下代码将逻辑复制到 ExecuteScript 处理器 -

import net.lingala.zip4j.io.inputstream.ZipInputStream;
import net.lingala.zip4j.model.LocalFileHeader;

@Override
        public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
            final String fragmentId = UUID.randomUUID().toString();
            session.read(source, new InputStreamCallback() {
                @Override
                public void process(final InputStream in) throws IOException {
                    int fragmentCount = 0;

                    try (final ZipInputStream zipIn = new ZipInputStream(new BufferedInputStream(in), UNPACK_PASSWORD.toCharArray())) {
                        LocalFileHeader zipEntry;
                        while ((zipEntry = zipIn.getNextEntry()) != null) {
                            if (zipEntry.isDirectory() || !fileMatchesWithEncryption(zipEntry)) {
                                continue;
                            }
                            final File file = new File(zipEntry.getFileName());
                            final String parentDirectory = (file.getParent() == null) ? "/" : file.getParent();
                            final Path absPath = file.toPath().toAbsolutePath();
                            final String absPathString = absPath.getParent().toString() + "/";

                            FlowFile unpackedFile = session.create(source);
                            try {
                                final Map<String, String> attributes = new HashMap<>();
                                attributes.put(CoreAttributes.FILENAME.key(), file.getName());
                                attributes.put(CoreAttributes.PATH.key(), parentDirectory);
                                attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
                                attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);

                                attributes.put(FRAGMENT_ID, fragmentId);
                                attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount));

                                unpackedFile = session.putAllAttributes(unpackedFile, attributes);
                                unpackedFile = session.write(unpackedFile, new OutputStreamCallback() {
                                    @Override
                                    public void process(final OutputStream out) throws IOException {
                                        StreamUtils.copy(zipIn, out);
                                    }
                                });
                            } finally {
                                unpacked.add(unpackedFile);
                            }
                        }
                    }
                }
            });
        }

供任何人参考,您需要在使用 toByteArray 后手动关闭 inputStream:以下代码应该有效:

@Grab('net.lingala.zip4j:zip4j:2.2.8')
import net.lingala.zip4j.io.outputstream.ZipOutputStream;
import net.lingala.zip4j.model.ZipParameters;
import net.lingala.zip4j.model.enums.EncryptionMethod;

import org.apache.commons.io.IOUtils

flowFile = session.get()
if(!flowFile) return

flowFile = session.write(flowFile, {inputStream, outputStream ->
    byte[] inputByteArray = IOUtils.toByteArray(inputStream)
    inputStream.close() // this is what was missing in the original code

    ByteArrayOutputStream outputByteStream = new ByteArrayOutputStream()
    ZipOutputStream outputZipStream = new ZipOutputStream(outputByteStream, "password".toCharArray())

    //init the zip parameters
    ZipParameters zipParams = new ZipParameters()
    zipParams.setEncryptFiles(true)
    zipParams.setEncryptionMethod(EncryptionMethod.AES)
    zipParams.setFileNameInZip("records.csv")

    outputZipStream.putNextEntry(zipParams)
    outputZipStream.write(inputByteArray)
    outputZipStream.closeEntry()

    outputZipStream.close()
    outputByteStream.close()

    outputStream.write(outputByteStream.toByteArray())
} as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)