如何将 nifi FlowFile 读取为一个字符串或如何读取处理器中的每一行
How to read nifi FlowFile as one string or how to read each line in Processor
我需要阅读每一行,或者从 FlowFile 中以字符串形式阅读完整内容
我知道,可以使用文件名读取每个记录值
但我需要完整阅读所有文件内容或完整行。
我们可以通过以下方式创建记录:
try (InputStream is = session.read(flowFile)) {
RecordReader reader = readerFactory.createRecordReader(flowFile, is, getLogger());
Record record;
while ((record = reader.nextRecord()) != null) {
... etc
但是在Record接口中没有方法只获取一行
并且在 FlowFile class 中没有读取所有文件的方法
所以,我想我可以读取 InputStream 并从中生成字符串。但它会起作用吗?
如果是,那么在 nifi
中将文件或行作为字符串读取是最好的解决方案吗
如果不是,请写出解决方案
?
P.S.
我使用的 google-chrome 历史记录中的标记(为了更好地搜索此问题):
nifi read all file content
、nifi read line from file
感谢@daggett
作为变体,我们可以使用 BufferedReader
但是作为另一个答案,更适合nifi,我们可以通过内存分区reader使用限制器,像这样叫StreamDemarcator
private void process(final FlowFile flowFile, final ProcessContext context, final ProcessSession session) {
final byte[] demarcatorBytes = context.getProperty(TEXT_DELIMITER).evaluateAttributeExpressions().getValue()
.getBytes(StandardCharsets.UTF_8);
final int maxMessageSize = context.getProperty(MAX_TEXT_SIZE).asDataSize(DataUnit.B).intValue();
InputStream flowFileContent = session.read(flowFile);
FlowFile outFile = session.create(flowFile);
try {
session.write(outFile, out -> {
try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
byte[] messageContent = demarcator.nextToken();
while (messageContent != null) {
messageContent = demarcator.nextToken();
if (messageContent != null) {
out.write(demarcatorBytes);
//or use you logic
}
}
}
});
} catch (Exception e) {
session.remove(outFile);
throw e;
}
session.transfer(outFile, REL_SUCCESS);
}
和class
import java.io.*;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;
/**
* The <code>StreamDemarcator</code> class takes an input stream and demarcates
* it so it could be read (see {@link #nextToken()}) as individual byte[]
* demarcated by the provided delimiter (see 'delimiterBytes'). If delimiter is
* not provided the entire stream will be read into a single token which may
* result in {@link OutOfMemoryError} if stream is too large. The 'maxDataSize'
* controls the maximum size of the buffer that accumulates a token.
* <p>
* NOTE: Not intended for multi-thread usage hence not Thread-safe.
* </p>
*/
public class StreamDemarcator extends AbstractDemarcator {
private final byte[] delimiterBytes;
private boolean skipLargeTokens = false;
/**
* Constructs a new instance
*
* @param is instance of {@link InputStream} representing the data
* @param delimiterBytes byte array representing delimiter bytes used to split the
* input stream. Can be 'null'. NOTE: the 'null' is allowed only
* for convenience and consistency since without delimiter this
* class is no different then BufferedReader which reads the
* entire stream into a byte array and there may be a more
* efficient ways to do that (if that is the case).
* @param maxDataSize maximum size of data derived from the input stream. This means
* that neither {@link InputStream} nor its individual tokens (if
* delimiter is used) can ever be greater then this size.
*/
public StreamDemarcator(InputStream is, byte[] delimiterBytes, int maxDataSize) {
this(is, delimiterBytes, maxDataSize, INIT_BUFFER_SIZE);
}
/**
* Constructs a new instance
*
* @param is instance of {@link InputStream} representing the data
* @param delimiterBytes byte array representing delimiter bytes used to split the
* input stream. Can be 'null'. NOTE: the 'null' is allowed only
* for convenience and consistency since without delimiter this
* class is no different then BufferedReader which reads the
* entire stream into a byte array and there may be a more
* efficient ways to do that (if that is the case).
* @param maxDataSize maximum size of data derived from the input stream. This means
* that neither {@link InputStream} nor its individual tokens (if
* delimiter is used) can ever be greater then this size.
* @param initialBufferSize initial size of the buffer used to buffer {@link InputStream}
* or its parts (if delimiter is used) to create its byte[]
* representation. Must be positive integer. The buffer will grow
* automatically as needed up to the Integer.MAX_VALUE;
*/
public StreamDemarcator(InputStream is, byte[] delimiterBytes, int maxDataSize, int initialBufferSize) {
super(is, maxDataSize, initialBufferSize);
this.validate(delimiterBytes);
this.delimiterBytes = delimiterBytes;
}
public StreamDemarcator skipLargeTokens(boolean value) {
skipLargeTokens = value;
return this;
}
/**
* Will read the next data token from the {@link InputStream} returning null
* when it reaches the end of the stream.
*
* @throws IOException if unable to read from the stream
*/
public byte[] nextToken() throws IOException {
byte[] token = null;
int j = 0;
nextTokenLoop:
while (token == null && this.availableBytesLength != -1) {
if (this.index >= this.availableBytesLength) {
this.fill();
}
if (this.availableBytesLength != -1) {
byte byteVal;
int i;
for (i = this.index; i < this.availableBytesLength; i++) {
byteVal = this.buffer[i];
boolean delimiterFound = false;
if (this.delimiterBytes != null && this.delimiterBytes[j] == byteVal) {
if (++j == this.delimiterBytes.length) {
delimiterFound = true;
}
} else {
j = 0;
}
if (delimiterFound) {
this.index = i + 1;
int size = this.index - this.mark - this.delimiterBytes.length;
try {
token = this.extractDataToken(size);
} catch (TokenTooLargeException e) {
if (!skipLargeTokens) {
throw e;
} else {
token = null;
}
}
this.mark = this.index;
j = 0;
if (token != null) {
break nextTokenLoop;
}
}
}
this.index = i;
} else {
try {
token = this.extractDataToken(this.index - this.mark);
} catch (TokenTooLargeException e) {
if (!skipLargeTokens) {
throw e;
}
}
}
}
return token;
}
/**
* Validates prerequisites for constructor arguments
*/
private void validate(byte[] delimiterBytes) {
if (delimiterBytes != null && delimiterBytes.length == 0) {
throw new IllegalArgumentException("'delimiterBytes' is an optional argument, but when provided its length must be > 0");
}
}
}
和父级 - AbstractDemarcator
import java.io.*;
import java.nio.BufferOverflowException;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;
/**
* Base class for implementing streaming demarcators.
* <p>
* NOTE: Not intended for multi-thread usage hence not Thread-safe.
* </p>
*/
abstract class AbstractDemarcator implements Closeable {
final static int INIT_BUFFER_SIZE = 8192;
private final InputStream is;
/*
* The size of the initial buffer. Its value is also used when bufer needs
* to be expanded.
*/
private final int initialBufferSize;
/*
* The maximum allowed size of the token. In the event such size is exceeded
* TokenTooLargeException is thrown.
*/
private final int maxDataSize;
/*
* Buffer into which the bytes are read from the provided stream. The size
* of the buffer is defined by the 'initialBufferSize' provided in the
* constructor or defaults to the value of INIT_BUFFER_SIZE constant.
*/
byte[] buffer;
/*
* Starting offset of the demarcated token within the current 'buffer'.
*/
int index;
/*
* Starting offset of the demarcated token within the current 'buffer'. Keep
* in mind that while most of the time it is the same as the 'index' it may
* also have a value of 0 at which point it serves as a signal to the fill()
* operation that buffer needs to be expended if end of token is not reached
* (see fill() operation for more details).
*/
int mark;
/*
* Starting offset (from the beginning of the stream) of the demarcated
* token.
*/
long offset;
/*
* The length of the bytes valid for reading. It is different from the
* buffer length, since this number may be smaller (e.g., at he end of the
* stream) then actual buffer length. It is set by the fill() operation
* every time more bytes read into buffer.
*/
int availableBytesLength;
/**
* Constructs an instance of demarcator with provided {@link InputStream}
* and max buffer size. Each demarcated token must fit within max buffer
* size, otherwise the exception will be raised.
*/
AbstractDemarcator(InputStream is, int maxDataSize) {
this(is, maxDataSize, INIT_BUFFER_SIZE);
}
/**
* Constructs an instance of demarcator with provided {@link InputStream}
* and max buffer size and initial buffer size. Each demarcated token must
* fit within max buffer size, otherwise the exception will be raised.
*/
AbstractDemarcator(InputStream is, int maxDataSize, int initialBufferSize) {
this.validate(is, maxDataSize, initialBufferSize);
this.is = is;
this.initialBufferSize = initialBufferSize;
this.buffer = new byte[initialBufferSize];
this.maxDataSize = maxDataSize;
}
@Override
public void close() throws IOException {
this.is.close();
}
/**
* Will fill the current buffer from current 'index' position, expanding it
* and or shuffling it if necessary. If buffer exceeds max buffer size a
* {@link TokenTooLargeException} will be thrown.
*
* @throws IOException if unable to read from the stream
*/
void fill() throws IOException {
if (this.index >= this.buffer.length) {
if (this.mark == 0) { // expand
long expandedSize = this.buffer.length + this.initialBufferSize;
if (expandedSize > Integer.MAX_VALUE) {
throw new BufferOverflowException(); // will probably OOM before this will ever happen, but just in case.
}
byte[] newBuff = new byte[(int) expandedSize];
System.arraycopy(this.buffer, 0, newBuff, 0, this.buffer.length);
this.buffer = newBuff;
} else { // shuffle
int length = this.index - this.mark;
System.arraycopy(this.buffer, this.mark, this.buffer, 0, length);
this.index = length;
this.mark = 0;
}
}
int bytesRead;
/*
* The do/while pattern is used here similar to the way it is used in
* BufferedReader essentially protecting from assuming the EOS until it
* actually is since not every implementation of InputStream guarantees
* that bytes are always available while the stream is open.
*/
do {
bytesRead = this.is.read(this.buffer, this.index, this.buffer.length - this.index);
} while (bytesRead == 0);
this.availableBytesLength = bytesRead != -1 ? this.index + bytesRead : -1;
}
/**
* Will extract data token of the provided length from the current buffer
* starting at the 'mark'.
*/
byte[] extractDataToken(int length) throws IOException {
if (length > this.maxDataSize) {
throw new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + this.maxDataSize + " bytes.");
}
byte[] data = null;
if (length > 0) {
data = new byte[length];
System.arraycopy(this.buffer, this.mark, data, 0, data.length);
}
return data;
}
/**
* Validates prerequisites for constructor arguments
*/
private void validate(InputStream is, int maxDataSize, int initialBufferSize) {
if (is == null) {
throw new IllegalArgumentException("'is' must not be null");
} else if (maxDataSize <= 0) {
throw new IllegalArgumentException("'maxDataSize' must be > 0");
} else if (initialBufferSize <= 0) {
throw new IllegalArgumentException("'initialBufferSize' must be > 0");
}
}
}
我需要阅读每一行,或者从 FlowFile 中以字符串形式阅读完整内容
我知道,可以使用文件名读取每个记录值
但我需要完整阅读所有文件内容或完整行。
我们可以通过以下方式创建记录:
try (InputStream is = session.read(flowFile)) {
RecordReader reader = readerFactory.createRecordReader(flowFile, is, getLogger());
Record record;
while ((record = reader.nextRecord()) != null) {
... etc
但是在Record接口中没有方法只获取一行
并且在 FlowFile class 中没有读取所有文件的方法
所以,我想我可以读取 InputStream 并从中生成字符串。但它会起作用吗? 如果是,那么在 nifi
中将文件或行作为字符串读取是最好的解决方案吗如果不是,请写出解决方案
?
P.S.
我使用的 google-chrome 历史记录中的标记(为了更好地搜索此问题):
nifi read all file content
、nifi read line from file
感谢@daggett
作为变体,我们可以使用 BufferedReader
但是作为另一个答案,更适合nifi,我们可以通过内存分区reader使用限制器,像这样叫StreamDemarcator
private void process(final FlowFile flowFile, final ProcessContext context, final ProcessSession session) {
final byte[] demarcatorBytes = context.getProperty(TEXT_DELIMITER).evaluateAttributeExpressions().getValue()
.getBytes(StandardCharsets.UTF_8);
final int maxMessageSize = context.getProperty(MAX_TEXT_SIZE).asDataSize(DataUnit.B).intValue();
InputStream flowFileContent = session.read(flowFile);
FlowFile outFile = session.create(flowFile);
try {
session.write(outFile, out -> {
try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
byte[] messageContent = demarcator.nextToken();
while (messageContent != null) {
messageContent = demarcator.nextToken();
if (messageContent != null) {
out.write(demarcatorBytes);
//or use you logic
}
}
}
});
} catch (Exception e) {
session.remove(outFile);
throw e;
}
session.transfer(outFile, REL_SUCCESS);
}
和class
import java.io.*;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;
/**
* The <code>StreamDemarcator</code> class takes an input stream and demarcates
* it so it could be read (see {@link #nextToken()}) as individual byte[]
* demarcated by the provided delimiter (see 'delimiterBytes'). If delimiter is
* not provided the entire stream will be read into a single token which may
* result in {@link OutOfMemoryError} if stream is too large. The 'maxDataSize'
* controls the maximum size of the buffer that accumulates a token.
* <p>
* NOTE: Not intended for multi-thread usage hence not Thread-safe.
* </p>
*/
public class StreamDemarcator extends AbstractDemarcator {
private final byte[] delimiterBytes;
private boolean skipLargeTokens = false;
/**
* Constructs a new instance
*
* @param is instance of {@link InputStream} representing the data
* @param delimiterBytes byte array representing delimiter bytes used to split the
* input stream. Can be 'null'. NOTE: the 'null' is allowed only
* for convenience and consistency since without delimiter this
* class is no different then BufferedReader which reads the
* entire stream into a byte array and there may be a more
* efficient ways to do that (if that is the case).
* @param maxDataSize maximum size of data derived from the input stream. This means
* that neither {@link InputStream} nor its individual tokens (if
* delimiter is used) can ever be greater then this size.
*/
public StreamDemarcator(InputStream is, byte[] delimiterBytes, int maxDataSize) {
this(is, delimiterBytes, maxDataSize, INIT_BUFFER_SIZE);
}
/**
* Constructs a new instance
*
* @param is instance of {@link InputStream} representing the data
* @param delimiterBytes byte array representing delimiter bytes used to split the
* input stream. Can be 'null'. NOTE: the 'null' is allowed only
* for convenience and consistency since without delimiter this
* class is no different then BufferedReader which reads the
* entire stream into a byte array and there may be a more
* efficient ways to do that (if that is the case).
* @param maxDataSize maximum size of data derived from the input stream. This means
* that neither {@link InputStream} nor its individual tokens (if
* delimiter is used) can ever be greater then this size.
* @param initialBufferSize initial size of the buffer used to buffer {@link InputStream}
* or its parts (if delimiter is used) to create its byte[]
* representation. Must be positive integer. The buffer will grow
* automatically as needed up to the Integer.MAX_VALUE;
*/
public StreamDemarcator(InputStream is, byte[] delimiterBytes, int maxDataSize, int initialBufferSize) {
super(is, maxDataSize, initialBufferSize);
this.validate(delimiterBytes);
this.delimiterBytes = delimiterBytes;
}
public StreamDemarcator skipLargeTokens(boolean value) {
skipLargeTokens = value;
return this;
}
/**
* Will read the next data token from the {@link InputStream} returning null
* when it reaches the end of the stream.
*
* @throws IOException if unable to read from the stream
*/
public byte[] nextToken() throws IOException {
byte[] token = null;
int j = 0;
nextTokenLoop:
while (token == null && this.availableBytesLength != -1) {
if (this.index >= this.availableBytesLength) {
this.fill();
}
if (this.availableBytesLength != -1) {
byte byteVal;
int i;
for (i = this.index; i < this.availableBytesLength; i++) {
byteVal = this.buffer[i];
boolean delimiterFound = false;
if (this.delimiterBytes != null && this.delimiterBytes[j] == byteVal) {
if (++j == this.delimiterBytes.length) {
delimiterFound = true;
}
} else {
j = 0;
}
if (delimiterFound) {
this.index = i + 1;
int size = this.index - this.mark - this.delimiterBytes.length;
try {
token = this.extractDataToken(size);
} catch (TokenTooLargeException e) {
if (!skipLargeTokens) {
throw e;
} else {
token = null;
}
}
this.mark = this.index;
j = 0;
if (token != null) {
break nextTokenLoop;
}
}
}
this.index = i;
} else {
try {
token = this.extractDataToken(this.index - this.mark);
} catch (TokenTooLargeException e) {
if (!skipLargeTokens) {
throw e;
}
}
}
}
return token;
}
/**
* Validates prerequisites for constructor arguments
*/
private void validate(byte[] delimiterBytes) {
if (delimiterBytes != null && delimiterBytes.length == 0) {
throw new IllegalArgumentException("'delimiterBytes' is an optional argument, but when provided its length must be > 0");
}
}
}
和父级 - AbstractDemarcator
import java.io.*;
import java.nio.BufferOverflowException;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;
/**
* Base class for implementing streaming demarcators.
* <p>
* NOTE: Not intended for multi-thread usage hence not Thread-safe.
* </p>
*/
abstract class AbstractDemarcator implements Closeable {
final static int INIT_BUFFER_SIZE = 8192;
private final InputStream is;
/*
* The size of the initial buffer. Its value is also used when bufer needs
* to be expanded.
*/
private final int initialBufferSize;
/*
* The maximum allowed size of the token. In the event such size is exceeded
* TokenTooLargeException is thrown.
*/
private final int maxDataSize;
/*
* Buffer into which the bytes are read from the provided stream. The size
* of the buffer is defined by the 'initialBufferSize' provided in the
* constructor or defaults to the value of INIT_BUFFER_SIZE constant.
*/
byte[] buffer;
/*
* Starting offset of the demarcated token within the current 'buffer'.
*/
int index;
/*
* Starting offset of the demarcated token within the current 'buffer'. Keep
* in mind that while most of the time it is the same as the 'index' it may
* also have a value of 0 at which point it serves as a signal to the fill()
* operation that buffer needs to be expended if end of token is not reached
* (see fill() operation for more details).
*/
int mark;
/*
* Starting offset (from the beginning of the stream) of the demarcated
* token.
*/
long offset;
/*
* The length of the bytes valid for reading. It is different from the
* buffer length, since this number may be smaller (e.g., at he end of the
* stream) then actual buffer length. It is set by the fill() operation
* every time more bytes read into buffer.
*/
int availableBytesLength;
/**
* Constructs an instance of demarcator with provided {@link InputStream}
* and max buffer size. Each demarcated token must fit within max buffer
* size, otherwise the exception will be raised.
*/
AbstractDemarcator(InputStream is, int maxDataSize) {
this(is, maxDataSize, INIT_BUFFER_SIZE);
}
/**
* Constructs an instance of demarcator with provided {@link InputStream}
* and max buffer size and initial buffer size. Each demarcated token must
* fit within max buffer size, otherwise the exception will be raised.
*/
AbstractDemarcator(InputStream is, int maxDataSize, int initialBufferSize) {
this.validate(is, maxDataSize, initialBufferSize);
this.is = is;
this.initialBufferSize = initialBufferSize;
this.buffer = new byte[initialBufferSize];
this.maxDataSize = maxDataSize;
}
@Override
public void close() throws IOException {
this.is.close();
}
/**
* Will fill the current buffer from current 'index' position, expanding it
* and or shuffling it if necessary. If buffer exceeds max buffer size a
* {@link TokenTooLargeException} will be thrown.
*
* @throws IOException if unable to read from the stream
*/
void fill() throws IOException {
if (this.index >= this.buffer.length) {
if (this.mark == 0) { // expand
long expandedSize = this.buffer.length + this.initialBufferSize;
if (expandedSize > Integer.MAX_VALUE) {
throw new BufferOverflowException(); // will probably OOM before this will ever happen, but just in case.
}
byte[] newBuff = new byte[(int) expandedSize];
System.arraycopy(this.buffer, 0, newBuff, 0, this.buffer.length);
this.buffer = newBuff;
} else { // shuffle
int length = this.index - this.mark;
System.arraycopy(this.buffer, this.mark, this.buffer, 0, length);
this.index = length;
this.mark = 0;
}
}
int bytesRead;
/*
* The do/while pattern is used here similar to the way it is used in
* BufferedReader essentially protecting from assuming the EOS until it
* actually is since not every implementation of InputStream guarantees
* that bytes are always available while the stream is open.
*/
do {
bytesRead = this.is.read(this.buffer, this.index, this.buffer.length - this.index);
} while (bytesRead == 0);
this.availableBytesLength = bytesRead != -1 ? this.index + bytesRead : -1;
}
/**
* Will extract data token of the provided length from the current buffer
* starting at the 'mark'.
*/
byte[] extractDataToken(int length) throws IOException {
if (length > this.maxDataSize) {
throw new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + this.maxDataSize + " bytes.");
}
byte[] data = null;
if (length > 0) {
data = new byte[length];
System.arraycopy(this.buffer, this.mark, data, 0, data.length);
}
return data;
}
/**
* Validates prerequisites for constructor arguments
*/
private void validate(InputStream is, int maxDataSize, int initialBufferSize) {
if (is == null) {
throw new IllegalArgumentException("'is' must not be null");
} else if (maxDataSize <= 0) {
throw new IllegalArgumentException("'maxDataSize' must be > 0");
} else if (initialBufferSize <= 0) {
throw new IllegalArgumentException("'initialBufferSize' must be > 0");
}
}
}