如何在 Jython 中使用修改后的数据更新行?

How to update line with modified data in Jython?

我有一个包含数十万行的 csv 文件,下面是一些示例行..,

1,Ni,23,28-02-2015 12:22:33.2212-02
2,Fi,21,28-02-2015 12:22:34.3212-02
3,Us,33,30-03-2015 12:23:35-01
4,Uk,34,31-03-2015 12:24:36.332211-02

我需要获取日期时间格式错误的 csv 数据的最后一列。所以我需要从数据的最后一列中获取默认的日期时间格式("YYYY-MM-DD hh:mm:ss[.nnn]")。

我已尝试使用以下脚本从中获取行并写入流文件。

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.readLines(inputStream, StandardCharsets.UTF_8)
    for line in text[1:]:
        outputStream.write(line + "\n") 

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile,PyStreamCallback())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename'))
  session.transfer(flowFile, REL_SUCCESS)

但我找不到像下面输出那样转换它的方法。

1,Ni,23,28-02-2015 12:22:33.221
2,Fi,21,29-02-2015 12:22:34.321
3,Us,33,30-03-2015 12:23:35
4,Uk,34,31-03-2015 12:24:36.332

我和我的朋友(google)一起检查了解决方案,但仍然找不到解决方案。

谁能指导我将这些输入数据转换成我需要的输出?

在此转换中,不必要的数据位于每行的末尾,因此使用正则表达式管理转换任务非常容易。

^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?

在这里检查正则表达式和解释: https://regex101.com/r/sAB4SA/2

一旦你有一个大文件 - 最好不要将它加载到内存中。以下代码将整个文件加载到内存中:

IOUtils.readLines(inputStream, StandardCharsets.UTF_8)

最好逐行迭代。

因此此代码适用于 ExecuteScript nifi 处理器,使用 python (Jython) 语言:

import sys
import re
import traceback
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core.util import StringUtil
from java.lang import Class
from java.io import BufferedReader
from java.io import InputStreamReader
from java.io import OutputStreamWriter


class TransformCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):
        try:
            writer = OutputStreamWriter(outputStream,"UTF-8")
            reader = BufferedReader(InputStreamReader(inputStream,"UTF-8"))
            line = reader.readLine()
            p = re.compile('^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?')
            while line!= None:
                # print line
                match = p.search(line)
                writer.write( match.group(1) + (match.group(3) if match.group(3)!=None else '') )
                writer.write('\n')
                line = reader.readLine()
            writer.flush()
            writer.close()
            reader.close()
        except:
            traceback.print_exc(file=sys.stdout)
            raise


flowFile = session.get()
if flowFile != None:
    flowFile = session.write(flowFile, TransformCallback())

    # Finish by transferring the FlowFile to an output relationship
    session.transfer(flowFile, REL_SUCCESS)

一旦问题是关于 nifi 的,这里有一些似乎更容易的替代方案


与上面相同的代码,但在 groovy 中用于 nifi ExecuteScript 处理器:

def ff = session.get()
if(!ff)return
ff = session.write(ff, {rawIn, rawOut->
    // ## transform streams into reader and writer
    rawIn.withReader("UTF-8"){reader->
        rawOut.withWriter("UTF-8"){writer->
            reader.eachLine{line, lineNum->
                if(lineNum>1) { // # skip the first line
                    // ## let use regular expression to transform each line
                    writer << line.replaceAll( /^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?/ , '' ) << '\n'
                }
            }
        }
    }
} as StreamCallback)
session.transfer(ff, REL_SUCCESS)

替换文本处理器

如果正则表达式没问题 - nifi 中最简单的方法是 ReplaceText 处理器,它可以逐行替换正则表达式。

在这种情况下,您无需编写任何代码,只需构建正则表达式并正确配置您的处理器即可。

您可以使用正则表达式获取它们:

(\d\d-\d\d-\d\d\d\d\ \d\d:\d\d:)(\d+(?:\.\d+)*)(-\d\d)$

然后将#2 替换为#2 的四舍五入版本

请参阅 regexr.com

中的正则表达式示例

您甚至可以 "nicer" 通过使用捕获组获取每个值,然后将它们放入 datetime.datetime 对象并从那里打印出来,但是恕我直言,这会有点矫枉过正可维护性,让你失去太多的性能。

代码 无法测试

import re
...
pattern = '^(.{25})(\d+(?:\.\d+)*)(-\d\d)$' //used offset for simplicity

....

  for line in text[1:]:
    match = re.search(pattern, line)
    line = match.group(1) + round(match.group(2),3) + match.group(3)
    outputStream.write(line + "\n") 

只是使用纯 jython。这是一个可以适应OP需求的例子。

为此 csv 文件定义一个日期时间解析器

from datetime import datetime
def parse_datetime(dtstr):
    mydatestr='-'.join(dtstr.split('-')[:-1])
    try:
        return datetime.strptime(mydatestr,'%d-%m-%Y %H:%M:%S.%f').strftime('%d-%m-%Y %H:%M:%S.%f')[:-3]
    except ValueError:
        return datetime.strptime(mydatestr,'%d-%m-%Y %H:%M:%S').strftime('%d-%m-%Y %H:%M:%S')

我的 test.csv 包括这样的数据:(2015 年没有 2 月 29 日必须更改 OP 的示例)。

1,Ni,23,27-02-2015 12:22:33.2212-02
2,Fi,21,28-02-2015 12:22:34.3212-02
3,Us,33,30-03-2015 12:23:35-01
4,Uk,34,31-03-2015 12:24:36.332211-02

现在是解决方案

with open('test.csv') as fi:
    for line in fi:
        line_split=line.split(',')
        out_line = ', '.join(word if i<3 else parse_datetime(word) for i,word in enumerate(line_split))
        #print(out_line)
        #you can write this out_line to a file here. 

打印 out_line 看起来像这样

1, Ni, 23, 27-02-2015 12:22:33.221
2, Fi, 21, 28-02-2015 12:22:34.321
3, Us, 33, 30-03-2015 12:23:35
4, Uk, 34, 31-03-2015 12:24:36.332