NiFi ExecuteScript 输出使用 zip4j 损坏
NiFi ExecuteScript output is corrupted using zip4j
@Grab('net.lingala.zip4j:zip4j:2.2.8')
import net.lingala.zip4j.io.outputstream.ZipOutputStream;
import net.lingala.zip4j.model.ZipParameters;
import net.lingala.zip4j.model.enums.EncryptionMethod;
import org.apache.commons.io.IOUtils
flowFile = session.get()
if(!flowFile) return
flowFile = session.write(flowFile, {inputStream, outputStream ->
byte[] inputByteArray = IOUtils.toByteArray(inputStream)
ByteArrayOutputStream outputByteStream = new ByteArrayOutputStream()
ZipOutputStream outputZipStream = new ZipOutputStream(outputByteStream, "password".toCharArray())
//init the zip parameters
ZipParameters zipParams = new ZipParameters()
zipParams.setEncryptFiles(true)
zipParams.setEncryptionMethod(EncryptionMethod.AES)
zipParams.setFileNameInZip("records.csv")
outputZipStream.putNextEntry(zipParams)
outputZipStream.write(inputByteArray)
outputZipStream.closeEntry()
outputZipStream.close()
outputByteStream.close()
outputStream.write(outputByteStream.toByteArray())
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
我正在使用 groovy 中编写的 executescript 使用密码压缩 csv。 executescript 处理器能够毫无问题地将流文件传递给下一个处理器,但该文件已损坏。 PutSFTP 处理器抱怨以下错误。
尽管如此,该文件还是通过 SFTP 存储的,但我无法将其解压缩,因为它已损坏。我的代码有什么问题?
在开发自定义处理器以解压缩加密内容时,我们已成功使用以下代码片段使用 Lingala 解压缩受密码保护的流文件。请参考以下代码将逻辑复制到 ExecuteScript 处理器 -
import net.lingala.zip4j.io.inputstream.ZipInputStream;
import net.lingala.zip4j.model.LocalFileHeader;
@Override
public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
final String fragmentId = UUID.randomUUID().toString();
session.read(source, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
int fragmentCount = 0;
try (final ZipInputStream zipIn = new ZipInputStream(new BufferedInputStream(in), UNPACK_PASSWORD.toCharArray())) {
LocalFileHeader zipEntry;
while ((zipEntry = zipIn.getNextEntry()) != null) {
if (zipEntry.isDirectory() || !fileMatchesWithEncryption(zipEntry)) {
continue;
}
final File file = new File(zipEntry.getFileName());
final String parentDirectory = (file.getParent() == null) ? "/" : file.getParent();
final Path absPath = file.toPath().toAbsolutePath();
final String absPathString = absPath.getParent().toString() + "/";
FlowFile unpackedFile = session.create(source);
try {
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), file.getName());
attributes.put(CoreAttributes.PATH.key(), parentDirectory);
attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
attributes.put(FRAGMENT_ID, fragmentId);
attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount));
unpackedFile = session.putAllAttributes(unpackedFile, attributes);
unpackedFile = session.write(unpackedFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
StreamUtils.copy(zipIn, out);
}
});
} finally {
unpacked.add(unpackedFile);
}
}
}
}
});
}
供任何人参考,您需要在使用 toByteArray 后手动关闭 inputStream:以下代码应该有效:
@Grab('net.lingala.zip4j:zip4j:2.2.8')
import net.lingala.zip4j.io.outputstream.ZipOutputStream;
import net.lingala.zip4j.model.ZipParameters;
import net.lingala.zip4j.model.enums.EncryptionMethod;
import org.apache.commons.io.IOUtils
flowFile = session.get()
if(!flowFile) return
flowFile = session.write(flowFile, {inputStream, outputStream ->
byte[] inputByteArray = IOUtils.toByteArray(inputStream)
inputStream.close() // this is what was missing in the original code
ByteArrayOutputStream outputByteStream = new ByteArrayOutputStream()
ZipOutputStream outputZipStream = new ZipOutputStream(outputByteStream, "password".toCharArray())
//init the zip parameters
ZipParameters zipParams = new ZipParameters()
zipParams.setEncryptFiles(true)
zipParams.setEncryptionMethod(EncryptionMethod.AES)
zipParams.setFileNameInZip("records.csv")
outputZipStream.putNextEntry(zipParams)
outputZipStream.write(inputByteArray)
outputZipStream.closeEntry()
outputZipStream.close()
outputByteStream.close()
outputStream.write(outputByteStream.toByteArray())
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
@Grab('net.lingala.zip4j:zip4j:2.2.8')
import net.lingala.zip4j.io.outputstream.ZipOutputStream;
import net.lingala.zip4j.model.ZipParameters;
import net.lingala.zip4j.model.enums.EncryptionMethod;
import org.apache.commons.io.IOUtils
flowFile = session.get()
if(!flowFile) return
flowFile = session.write(flowFile, {inputStream, outputStream ->
byte[] inputByteArray = IOUtils.toByteArray(inputStream)
ByteArrayOutputStream outputByteStream = new ByteArrayOutputStream()
ZipOutputStream outputZipStream = new ZipOutputStream(outputByteStream, "password".toCharArray())
//init the zip parameters
ZipParameters zipParams = new ZipParameters()
zipParams.setEncryptFiles(true)
zipParams.setEncryptionMethod(EncryptionMethod.AES)
zipParams.setFileNameInZip("records.csv")
outputZipStream.putNextEntry(zipParams)
outputZipStream.write(inputByteArray)
outputZipStream.closeEntry()
outputZipStream.close()
outputByteStream.close()
outputStream.write(outputByteStream.toByteArray())
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
我正在使用 groovy 中编写的 executescript 使用密码压缩 csv。 executescript 处理器能够毫无问题地将流文件传递给下一个处理器,但该文件已损坏。 PutSFTP 处理器抱怨以下错误。
尽管如此,该文件还是通过 SFTP 存储的,但我无法将其解压缩,因为它已损坏。我的代码有什么问题?
在开发自定义处理器以解压缩加密内容时,我们已成功使用以下代码片段使用 Lingala 解压缩受密码保护的流文件。请参考以下代码将逻辑复制到 ExecuteScript 处理器 -
import net.lingala.zip4j.io.inputstream.ZipInputStream;
import net.lingala.zip4j.model.LocalFileHeader;
@Override
public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
final String fragmentId = UUID.randomUUID().toString();
session.read(source, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
int fragmentCount = 0;
try (final ZipInputStream zipIn = new ZipInputStream(new BufferedInputStream(in), UNPACK_PASSWORD.toCharArray())) {
LocalFileHeader zipEntry;
while ((zipEntry = zipIn.getNextEntry()) != null) {
if (zipEntry.isDirectory() || !fileMatchesWithEncryption(zipEntry)) {
continue;
}
final File file = new File(zipEntry.getFileName());
final String parentDirectory = (file.getParent() == null) ? "/" : file.getParent();
final Path absPath = file.toPath().toAbsolutePath();
final String absPathString = absPath.getParent().toString() + "/";
FlowFile unpackedFile = session.create(source);
try {
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), file.getName());
attributes.put(CoreAttributes.PATH.key(), parentDirectory);
attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
attributes.put(FRAGMENT_ID, fragmentId);
attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount));
unpackedFile = session.putAllAttributes(unpackedFile, attributes);
unpackedFile = session.write(unpackedFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
StreamUtils.copy(zipIn, out);
}
});
} finally {
unpacked.add(unpackedFile);
}
}
}
}
});
}
供任何人参考,您需要在使用 toByteArray 后手动关闭 inputStream:以下代码应该有效:
@Grab('net.lingala.zip4j:zip4j:2.2.8')
import net.lingala.zip4j.io.outputstream.ZipOutputStream;
import net.lingala.zip4j.model.ZipParameters;
import net.lingala.zip4j.model.enums.EncryptionMethod;
import org.apache.commons.io.IOUtils
flowFile = session.get()
if(!flowFile) return
flowFile = session.write(flowFile, {inputStream, outputStream ->
byte[] inputByteArray = IOUtils.toByteArray(inputStream)
inputStream.close() // this is what was missing in the original code
ByteArrayOutputStream outputByteStream = new ByteArrayOutputStream()
ZipOutputStream outputZipStream = new ZipOutputStream(outputByteStream, "password".toCharArray())
//init the zip parameters
ZipParameters zipParams = new ZipParameters()
zipParams.setEncryptFiles(true)
zipParams.setEncryptionMethod(EncryptionMethod.AES)
zipParams.setFileNameInZip("records.csv")
outputZipStream.putNextEntry(zipParams)
outputZipStream.write(inputByteArray)
outputZipStream.closeEntry()
outputZipStream.close()
outputByteStream.close()
outputStream.write(outputByteStream.toByteArray())
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)