Spring Dataflow 流文件接收器 - 需要写入单独的文件
Spring Dataflow Stream File Sink - Need to write to separate files
我一直在使用 Spring Cloud Dataflow。我编写了一个自定义 Cloud Stream Processor 应用程序,它接收特定类型的 XML 文档,并将其拆分为更小的 XML 文档。
我期待下面的 Cloud Stream 定义写出多个文件。 相反 当使用同一个文件进行测试时 它偶尔会将我的一些较小的 XML 写入一个文件,有时它会将它们写入两个 [=35] =](我认为这是由于我在下面定义中的 fixed-delay
值)。
我想知道如何让我的流将每个 XML 文档写入其自己的文件。当我编写处理器时,我专门使用 this.processor.output().send(message);
而不是 @SendTo(Processor.OUTPUT)
,认为这样可以避免这个确切的问题。
一如既往,我们非常感谢您的帮助。谢谢。
数据流流定义:
xmlSplit=fileIn: file --directory=/root/file_in --filename-regex=redactApplication.xml --fixed-delay=30 --markers-json=false --mode=contents | custom-xml-splitter | fileOut: file --directory=/root/file_out --name-expression="'test' + new java.text.SimpleDateFormat('yyyyMMddHHmmss').format(new java.util.Date()) + '.txt'"
我的自定义处理器代码:
@Component
@EnableBinding(Processor.class)
public class REDACTXMLSplitter {
@Autowired
private Processor processor;
@SuppressWarnings("unchecked")
@StreamListener(Processor.INPUT)
public void parseForREDACTApplications(byte[] redcactXMLByteArray) {
String redcactXML = new String(redcactXMLByteArray);
InputSource doc = new InputSource( new StringReader( redcactXML ) );
try
{
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
factory.setNamespaceAware(true); // never forget this!
XPathFactory xfactory = XPathFactory.newInstance();
XPath xpath = xfactory.newXPath();
String xpathQuery = "//REDACT/Application";
xpath = xfactory.newXPath();
XPathExpression query = xpath.compile(xpathQuery);
NodeList productNodesFiltered = (NodeList) query.evaluate(doc, XPathConstants.NODESET);
for (int i=0; i<productNodesFiltered.getLength(); ++i)
{
Document suppXml = dBuilder.newDocument();
//we have to recreate the root node <products>
Element root = suppXml.createElement("REDACT");
Node productNode = productNodesFiltered.item(i);
//we append a product (cloned) to the new file
Node clonedNode = productNode.cloneNode(true);
suppXml.adoptNode(clonedNode); //We adopt the orphan :)
root.appendChild(clonedNode);
suppXml.appendChild(root);
//write out files
//At the end, we save the file XML on disk
// TransformerFactory transformerFactory = TransformerFactory.newInstance();
// Transformer transformer = transformerFactory.newTransformer();
// transformer.setOutputProperty(OutputKeys.INDENT, "yes");
// DOMSource source = new DOMSource(suppXml);
// StreamResult result = new StreamResult(new File("test_" + i + ".xml"));
// transformer.transform(source, result);
TransformerFactory tf = TransformerFactory.newInstance();
Transformer transformer = tf.newTransformer();
transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
StringWriter writer = new StringWriter();
transformer.transform(new DOMSource(suppXml), new StreamResult(writer));
String output = writer.getBuffer().toString().replaceAll("\n|\r", "");
System.out.println(output);
Message<String> message = new GenericMessage<>(output);
this.processor.output().send(message);
}
}
catch (XPathExpressionException | ParserConfigurationException | TransformerException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
也许文件是突发到达的,因为接收步骤正在发送到一个文件,文件的名称是日期时间(以秒为单位)?如果多个 xml 在同一秒内到达,将最终出现在同一个文件中?
我一直在使用 Spring Cloud Dataflow。我编写了一个自定义 Cloud Stream Processor 应用程序,它接收特定类型的 XML 文档,并将其拆分为更小的 XML 文档。
我期待下面的 Cloud Stream 定义写出多个文件。 相反 当使用同一个文件进行测试时 它偶尔会将我的一些较小的 XML 写入一个文件,有时它会将它们写入两个 [=35] =](我认为这是由于我在下面定义中的 fixed-delay
值)。
我想知道如何让我的流将每个 XML 文档写入其自己的文件。当我编写处理器时,我专门使用 this.processor.output().send(message);
而不是 @SendTo(Processor.OUTPUT)
,认为这样可以避免这个确切的问题。
一如既往,我们非常感谢您的帮助。谢谢。
数据流流定义:
xmlSplit=fileIn: file --directory=/root/file_in --filename-regex=redactApplication.xml --fixed-delay=30 --markers-json=false --mode=contents | custom-xml-splitter | fileOut: file --directory=/root/file_out --name-expression="'test' + new java.text.SimpleDateFormat('yyyyMMddHHmmss').format(new java.util.Date()) + '.txt'"
我的自定义处理器代码:
@Component
@EnableBinding(Processor.class)
public class REDACTXMLSplitter {
@Autowired
private Processor processor;
@SuppressWarnings("unchecked")
@StreamListener(Processor.INPUT)
public void parseForREDACTApplications(byte[] redcactXMLByteArray) {
String redcactXML = new String(redcactXMLByteArray);
InputSource doc = new InputSource( new StringReader( redcactXML ) );
try
{
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
factory.setNamespaceAware(true); // never forget this!
XPathFactory xfactory = XPathFactory.newInstance();
XPath xpath = xfactory.newXPath();
String xpathQuery = "//REDACT/Application";
xpath = xfactory.newXPath();
XPathExpression query = xpath.compile(xpathQuery);
NodeList productNodesFiltered = (NodeList) query.evaluate(doc, XPathConstants.NODESET);
for (int i=0; i<productNodesFiltered.getLength(); ++i)
{
Document suppXml = dBuilder.newDocument();
//we have to recreate the root node <products>
Element root = suppXml.createElement("REDACT");
Node productNode = productNodesFiltered.item(i);
//we append a product (cloned) to the new file
Node clonedNode = productNode.cloneNode(true);
suppXml.adoptNode(clonedNode); //We adopt the orphan :)
root.appendChild(clonedNode);
suppXml.appendChild(root);
//write out files
//At the end, we save the file XML on disk
// TransformerFactory transformerFactory = TransformerFactory.newInstance();
// Transformer transformer = transformerFactory.newTransformer();
// transformer.setOutputProperty(OutputKeys.INDENT, "yes");
// DOMSource source = new DOMSource(suppXml);
// StreamResult result = new StreamResult(new File("test_" + i + ".xml"));
// transformer.transform(source, result);
TransformerFactory tf = TransformerFactory.newInstance();
Transformer transformer = tf.newTransformer();
transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
StringWriter writer = new StringWriter();
transformer.transform(new DOMSource(suppXml), new StreamResult(writer));
String output = writer.getBuffer().toString().replaceAll("\n|\r", "");
System.out.println(output);
Message<String> message = new GenericMessage<>(output);
this.processor.output().send(message);
}
}
catch (XPathExpressionException | ParserConfigurationException | TransformerException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
也许文件是突发到达的,因为接收步骤正在发送到一个文件,文件的名称是日期时间(以秒为单位)?如果多个 xml 在同一秒内到达,将最终出现在同一个文件中?