如何将 nifi FlowFile 读取为一个字符串或如何读取处理器中的每一行

How to read nifi FlowFile as one string or how to read each line in Processor

我需要阅读每一行,或者从 FlowFile 中以字符串形式阅读完整内容

我知道,可以使用文件名读取每个记录值

但我需要完整阅读所有文件内容或完整行。

我们可以通过以下方式创建记录:

    try (InputStream is = session.read(flowFile)) {
        RecordReader reader = readerFactory.createRecordReader(flowFile, is, getLogger());

        Record record;
        while ((record = reader.nextRecord()) != null) {
... etc

但是在Record接口中没有方法只获取一行

并且在 FlowFile class 中没有读取所有文件的方法

所以,我想我可以读取 InputStream 并从中生成字符串。但它会起作用吗? 如果是,那么在 nifi

中将文件或行作为字符串读取是最好的解决方案吗

如果不是,请写出解决方案

?

P.S.

我使用的 google-chrome 历史记录中的标记(为了更好地搜索此问题): nifi read all file contentnifi read line from file

感谢@daggett

作为变体,我们可以使用 BufferedReader

但是作为另一个答案,更适合nifi,我们可以通过内存分区reader使用限制器,像这样叫StreamDemarcator

private void process(final FlowFile flowFile, final ProcessContext context, final ProcessSession session) {
    final byte[] demarcatorBytes = context.getProperty(TEXT_DELIMITER).evaluateAttributeExpressions().getValue()
                                          .getBytes(StandardCharsets.UTF_8);
    final int maxMessageSize = context.getProperty(MAX_TEXT_SIZE).asDataSize(DataUnit.B).intValue();
    InputStream flowFileContent = session.read(flowFile);
    FlowFile outFile = session.create(flowFile);
    try {
        session.write(outFile, out -> {
            try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
                byte[] messageContent = demarcator.nextToken();
                while (messageContent != null) {
                    messageContent = demarcator.nextToken();
                    if (messageContent != null) {
                        out.write(demarcatorBytes);
                        //or use you logic
                    }
                }
            }
        });
    } catch (Exception e) {
        session.remove(outFile);
        throw e;
    }
    session.transfer(outFile, REL_SUCCESS);
}

和class

import java.io.*;

import org.apache.nifi.stream.io.exception.TokenTooLargeException;

/**
 * The <code>StreamDemarcator</code> class takes an input stream and demarcates
 * it so it could be read (see {@link #nextToken()}) as individual byte[]
 * demarcated by the provided delimiter (see 'delimiterBytes'). If delimiter is
 * not provided the entire stream will be read into a single token which may
 * result in {@link OutOfMemoryError} if stream is too large. The 'maxDataSize'
 * controls the maximum size of the buffer that accumulates a token.
 * <p>
 * NOTE: Not intended for multi-thread usage hence not Thread-safe.
 * </p>
 */
public class StreamDemarcator extends AbstractDemarcator {
    private final byte[] delimiterBytes;
    private boolean skipLargeTokens = false;

    /**
     * Constructs a new instance
     *
     * @param is             instance of {@link InputStream} representing the data
     * @param delimiterBytes byte array representing delimiter bytes used to split the
     *                       input stream. Can be 'null'. NOTE: the 'null' is allowed only
     *                       for convenience and consistency since without delimiter this
     *                       class is no different then BufferedReader which reads the
     *                       entire stream into a byte array and there may be a more
     *                       efficient ways to do that (if that is the case).
     * @param maxDataSize    maximum size of data derived from the input stream. This means
     *                       that neither {@link InputStream} nor its individual tokens (if
     *                       delimiter is used) can ever be greater then this size.
     */
    public StreamDemarcator(InputStream is, byte[] delimiterBytes, int maxDataSize) {
        this(is, delimiterBytes, maxDataSize, INIT_BUFFER_SIZE);
    }

    /**
     * Constructs a new instance
     *
     * @param is                instance of {@link InputStream} representing the data
     * @param delimiterBytes    byte array representing delimiter bytes used to split the
     *                          input stream. Can be 'null'. NOTE: the 'null' is allowed only
     *                          for convenience and consistency since without delimiter this
     *                          class is no different then BufferedReader which reads the
     *                          entire stream into a byte array and there may be a more
     *                          efficient ways to do that (if that is the case).
     * @param maxDataSize       maximum size of data derived from the input stream. This means
     *                          that neither {@link InputStream} nor its individual tokens (if
     *                          delimiter is used) can ever be greater then this size.
     * @param initialBufferSize initial size of the buffer used to buffer {@link InputStream}
     *                          or its parts (if delimiter is used) to create its byte[]
     *                          representation. Must be positive integer. The buffer will grow
     *                          automatically as needed up to the Integer.MAX_VALUE;
     */
    public StreamDemarcator(InputStream is, byte[] delimiterBytes, int maxDataSize, int initialBufferSize) {
        super(is, maxDataSize, initialBufferSize);
        this.validate(delimiterBytes);
        this.delimiterBytes = delimiterBytes;
    }

    public StreamDemarcator skipLargeTokens(boolean value) {
        skipLargeTokens = value;
        return this;
    }

    /**
     * Will read the next data token from the {@link InputStream} returning null
     * when it reaches the end of the stream.
     *
     * @throws IOException if unable to read from the stream
     */
    public byte[] nextToken() throws IOException {
        byte[] token = null;
        int j = 0;
nextTokenLoop:
        while (token == null && this.availableBytesLength != -1) {
            if (this.index >= this.availableBytesLength) {
                this.fill();
            }
            if (this.availableBytesLength != -1) {
                byte byteVal;
                int i;
                for (i = this.index; i < this.availableBytesLength; i++) {
                    byteVal = this.buffer[i];

                    boolean delimiterFound = false;
                    if (this.delimiterBytes != null && this.delimiterBytes[j] == byteVal) {
                        if (++j == this.delimiterBytes.length) {
                            delimiterFound = true;
                        }
                    } else {
                        j = 0;
                    }

                    if (delimiterFound) {
                        this.index = i + 1;
                        int size = this.index - this.mark - this.delimiterBytes.length;
                        try {
                            token = this.extractDataToken(size);
                        } catch (TokenTooLargeException e) {
                            if (!skipLargeTokens) {
                                throw e;
                            } else {
                                token = null;
                            }
                        }
                        this.mark = this.index;
                        j = 0;
                        if (token != null) {
                            break nextTokenLoop;
                        }
                    }
                }
                this.index = i;
            } else {
                try {
                    token = this.extractDataToken(this.index - this.mark);
                } catch (TokenTooLargeException e) {
                    if (!skipLargeTokens) {
                        throw e;
                    }
                }
            }
        }
        return token;
    }

    /**
     * Validates prerequisites for constructor arguments
     */
    private void validate(byte[] delimiterBytes) {
        if (delimiterBytes != null && delimiterBytes.length == 0) {
            throw new IllegalArgumentException("'delimiterBytes' is an optional argument, but when provided its length must be > 0");
        }
    }
}

和父级 - AbstractDemarcator

import java.io.*;
import java.nio.BufferOverflowException;

import org.apache.nifi.stream.io.exception.TokenTooLargeException;

/**
 * Base class for implementing streaming demarcators.
 * <p>
 * NOTE: Not intended for multi-thread usage hence not Thread-safe.
 * </p>
 */
abstract class AbstractDemarcator implements Closeable {

    final static int INIT_BUFFER_SIZE = 8192;

    private final InputStream is;

    /*
     * The size of the initial buffer. Its value is also used when bufer needs
     * to be expanded.
     */
    private final int initialBufferSize;

    /*
     * The maximum allowed size of the token. In the event such size is exceeded
     * TokenTooLargeException is thrown.
     */
    private final int maxDataSize;

    /*
     * Buffer into which the bytes are read from the provided stream. The size
     * of the buffer is defined by the 'initialBufferSize' provided in the
     * constructor or defaults to the value of INIT_BUFFER_SIZE constant.
     */
    byte[] buffer;

    /*
     * Starting offset of the demarcated token within the current 'buffer'.
     */
    int index;

    /*
     * Starting offset of the demarcated token within the current 'buffer'. Keep
     * in mind that while most of the time it is the same as the 'index' it may
     * also have a value of 0 at which point it serves as a signal to the fill()
     * operation that buffer needs to be expended if end of token is not reached
     * (see fill() operation for more details).
     */
    int mark;

    /*
     * Starting offset (from the beginning of the stream) of the demarcated
     * token.
     */
    long offset;

    /*
     * The length of the bytes valid for reading. It is different from the
     * buffer length, since this number may be smaller (e.g., at he end of the
     * stream) then actual buffer length. It is set by the fill() operation
     * every time more bytes read into buffer.
     */
    int availableBytesLength;

    /**
     * Constructs an instance of demarcator with provided {@link InputStream}
     * and max buffer size. Each demarcated token must fit within max buffer
     * size, otherwise the exception will be raised.
     */
    AbstractDemarcator(InputStream is, int maxDataSize) {
        this(is, maxDataSize, INIT_BUFFER_SIZE);
    }

    /**
     * Constructs an instance of demarcator with provided {@link InputStream}
     * and max buffer size and initial buffer size. Each demarcated token must
     * fit within max buffer size, otherwise the exception will be raised.
     */
    AbstractDemarcator(InputStream is, int maxDataSize, int initialBufferSize) {
        this.validate(is, maxDataSize, initialBufferSize);
        this.is = is;
        this.initialBufferSize = initialBufferSize;
        this.buffer = new byte[initialBufferSize];
        this.maxDataSize = maxDataSize;
    }

    @Override
    public void close() throws IOException {
        this.is.close();
    }

    /**
     * Will fill the current buffer from current 'index' position, expanding it
     * and or shuffling it if necessary. If buffer exceeds max buffer size a
     * {@link TokenTooLargeException} will be thrown.
     *
     * @throws IOException if unable to read from the stream
     */
    void fill() throws IOException {
        if (this.index >= this.buffer.length) {
            if (this.mark == 0) { // expand
                long expandedSize = this.buffer.length + this.initialBufferSize;
                if (expandedSize > Integer.MAX_VALUE) {
                    throw new BufferOverflowException(); // will probably OOM before this will ever happen, but just in case.
                }
                byte[] newBuff = new byte[(int) expandedSize];
                System.arraycopy(this.buffer, 0, newBuff, 0, this.buffer.length);
                this.buffer = newBuff;
            } else { // shuffle
                int length = this.index - this.mark;
                System.arraycopy(this.buffer, this.mark, this.buffer, 0, length);
                this.index = length;
                this.mark = 0;
            }
        }

        int bytesRead;
        /*
         * The do/while pattern is used here similar to the way it is used in
         * BufferedReader essentially protecting from assuming the EOS until it
         * actually is since not every implementation of InputStream guarantees
         * that bytes are always available while the stream is open.
         */
        do {
            bytesRead = this.is.read(this.buffer, this.index, this.buffer.length - this.index);
        } while (bytesRead == 0);
        this.availableBytesLength = bytesRead != -1 ? this.index + bytesRead : -1;
    }

    /**
     * Will extract data token of the provided length from the current buffer
     * starting at the 'mark'.
     */
    byte[] extractDataToken(int length) throws IOException {
        if (length > this.maxDataSize) {
            throw new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + this.maxDataSize + " bytes.");
        }
        byte[] data = null;
        if (length > 0) {
            data = new byte[length];
            System.arraycopy(this.buffer, this.mark, data, 0, data.length);
        }
        return data;
    }

    /**
     * Validates prerequisites for constructor arguments
     */
    private void validate(InputStream is, int maxDataSize, int initialBufferSize) {
        if (is == null) {
            throw new IllegalArgumentException("'is' must not be null");
        } else if (maxDataSize <= 0) {
            throw new IllegalArgumentException("'maxDataSize' must be > 0");
        } else if (initialBufferSize <= 0) {
            throw new IllegalArgumentException("'initialBufferSize' must be > 0");
        }
    }
}