如何在 Jython 中使用修改后的数据更新行?
How to update line with modified data in Jython?
我有一个包含数十万行的 csv 文件,下面是一些示例行..,
1,Ni,23,28-02-2015 12:22:33.2212-02
2,Fi,21,28-02-2015 12:22:34.3212-02
3,Us,33,30-03-2015 12:23:35-01
4,Uk,34,31-03-2015 12:24:36.332211-02
我需要获取日期时间格式错误的 csv 数据的最后一列。所以我需要从数据的最后一列中获取默认的日期时间格式("YYYY-MM-DD hh:mm:ss[.nnn]"
)。
我已尝试使用以下脚本从中获取行并写入流文件。
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.readLines(inputStream, StandardCharsets.UTF_8)
for line in text[1:]:
outputStream.write(line + "\n")
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile,PyStreamCallback())
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename'))
session.transfer(flowFile, REL_SUCCESS)
但我找不到像下面输出那样转换它的方法。
1,Ni,23,28-02-2015 12:22:33.221
2,Fi,21,29-02-2015 12:22:34.321
3,Us,33,30-03-2015 12:23:35
4,Uk,34,31-03-2015 12:24:36.332
我和我的朋友(google)一起检查了解决方案,但仍然找不到解决方案。
谁能指导我将这些输入数据转换成我需要的输出?
在此转换中,不必要的数据位于每行的末尾,因此使用正则表达式管理转换任务非常容易。
^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?
在这里检查正则表达式和解释:
https://regex101.com/r/sAB4SA/2
一旦你有一个大文件 - 最好不要将它加载到内存中。以下代码将整个文件加载到内存中:
IOUtils.readLines(inputStream, StandardCharsets.UTF_8)
最好逐行迭代。
因此此代码适用于 ExecuteScript
nifi 处理器,使用 python (Jython) 语言:
import sys
import re
import traceback
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core.util import StringUtil
from java.lang import Class
from java.io import BufferedReader
from java.io import InputStreamReader
from java.io import OutputStreamWriter
class TransformCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
try:
writer = OutputStreamWriter(outputStream,"UTF-8")
reader = BufferedReader(InputStreamReader(inputStream,"UTF-8"))
line = reader.readLine()
p = re.compile('^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?')
while line!= None:
# print line
match = p.search(line)
writer.write( match.group(1) + (match.group(3) if match.group(3)!=None else '') )
writer.write('\n')
line = reader.readLine()
writer.flush()
writer.close()
reader.close()
except:
traceback.print_exc(file=sys.stdout)
raise
flowFile = session.get()
if flowFile != None:
flowFile = session.write(flowFile, TransformCallback())
# Finish by transferring the FlowFile to an output relationship
session.transfer(flowFile, REL_SUCCESS)
一旦问题是关于 nifi 的,这里有一些似乎更容易的替代方案
与上面相同的代码,但在 groovy 中用于 nifi ExecuteScript
处理器:
def ff = session.get()
if(!ff)return
ff = session.write(ff, {rawIn, rawOut->
// ## transform streams into reader and writer
rawIn.withReader("UTF-8"){reader->
rawOut.withWriter("UTF-8"){writer->
reader.eachLine{line, lineNum->
if(lineNum>1) { // # skip the first line
// ## let use regular expression to transform each line
writer << line.replaceAll( /^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?/ , '' ) << '\n'
}
}
}
}
} as StreamCallback)
session.transfer(ff, REL_SUCCESS)
替换文本处理器
如果正则表达式没问题 - nifi 中最简单的方法是 ReplaceText
处理器,它可以逐行替换正则表达式。
在这种情况下,您无需编写任何代码,只需构建正则表达式并正确配置您的处理器即可。
您可以使用正则表达式获取它们:
(\d\d-\d\d-\d\d\d\d\ \d\d:\d\d:)(\d+(?:\.\d+)*)(-\d\d)$
然后将#2 替换为#2 的四舍五入版本
请参阅 regexr.com
中的正则表达式示例
您甚至可以 "nicer" 通过使用捕获组获取每个值,然后将它们放入 datetime.datetime 对象并从那里打印出来,但是恕我直言,这会有点矫枉过正可维护性,让你失去太多的性能。
代码 无法测试
import re
...
pattern = '^(.{25})(\d+(?:\.\d+)*)(-\d\d)$' //used offset for simplicity
....
for line in text[1:]:
match = re.search(pattern, line)
line = match.group(1) + round(match.group(2),3) + match.group(3)
outputStream.write(line + "\n")
只是使用纯 jython。这是一个可以适应OP需求的例子。
为此 csv 文件定义一个日期时间解析器
from datetime import datetime
def parse_datetime(dtstr):
mydatestr='-'.join(dtstr.split('-')[:-1])
try:
return datetime.strptime(mydatestr,'%d-%m-%Y %H:%M:%S.%f').strftime('%d-%m-%Y %H:%M:%S.%f')[:-3]
except ValueError:
return datetime.strptime(mydatestr,'%d-%m-%Y %H:%M:%S').strftime('%d-%m-%Y %H:%M:%S')
我的 test.csv
包括这样的数据:(2015 年没有 2 月 29 日必须更改 OP 的示例)。
1,Ni,23,27-02-2015 12:22:33.2212-02
2,Fi,21,28-02-2015 12:22:34.3212-02
3,Us,33,30-03-2015 12:23:35-01
4,Uk,34,31-03-2015 12:24:36.332211-02
现在是解决方案
with open('test.csv') as fi:
for line in fi:
line_split=line.split(',')
out_line = ', '.join(word if i<3 else parse_datetime(word) for i,word in enumerate(line_split))
#print(out_line)
#you can write this out_line to a file here.
打印 out_line
看起来像这样
1, Ni, 23, 27-02-2015 12:22:33.221
2, Fi, 21, 28-02-2015 12:22:34.321
3, Us, 33, 30-03-2015 12:23:35
4, Uk, 34, 31-03-2015 12:24:36.332
我有一个包含数十万行的 csv 文件,下面是一些示例行..,
1,Ni,23,28-02-2015 12:22:33.2212-02
2,Fi,21,28-02-2015 12:22:34.3212-02
3,Us,33,30-03-2015 12:23:35-01
4,Uk,34,31-03-2015 12:24:36.332211-02
我需要获取日期时间格式错误的 csv 数据的最后一列。所以我需要从数据的最后一列中获取默认的日期时间格式("YYYY-MM-DD hh:mm:ss[.nnn]"
)。
我已尝试使用以下脚本从中获取行并写入流文件。
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.readLines(inputStream, StandardCharsets.UTF_8)
for line in text[1:]:
outputStream.write(line + "\n")
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile,PyStreamCallback())
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename'))
session.transfer(flowFile, REL_SUCCESS)
但我找不到像下面输出那样转换它的方法。
1,Ni,23,28-02-2015 12:22:33.221
2,Fi,21,29-02-2015 12:22:34.321
3,Us,33,30-03-2015 12:23:35
4,Uk,34,31-03-2015 12:24:36.332
我和我的朋友(google)一起检查了解决方案,但仍然找不到解决方案。
谁能指导我将这些输入数据转换成我需要的输出?
在此转换中,不必要的数据位于每行的末尾,因此使用正则表达式管理转换任务非常容易。
^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?
在这里检查正则表达式和解释: https://regex101.com/r/sAB4SA/2
一旦你有一个大文件 - 最好不要将它加载到内存中。以下代码将整个文件加载到内存中:
IOUtils.readLines(inputStream, StandardCharsets.UTF_8)
最好逐行迭代。
因此此代码适用于 ExecuteScript
nifi 处理器,使用 python (Jython) 语言:
import sys
import re
import traceback
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core.util import StringUtil
from java.lang import Class
from java.io import BufferedReader
from java.io import InputStreamReader
from java.io import OutputStreamWriter
class TransformCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
try:
writer = OutputStreamWriter(outputStream,"UTF-8")
reader = BufferedReader(InputStreamReader(inputStream,"UTF-8"))
line = reader.readLine()
p = re.compile('^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?')
while line!= None:
# print line
match = p.search(line)
writer.write( match.group(1) + (match.group(3) if match.group(3)!=None else '') )
writer.write('\n')
line = reader.readLine()
writer.flush()
writer.close()
reader.close()
except:
traceback.print_exc(file=sys.stdout)
raise
flowFile = session.get()
if flowFile != None:
flowFile = session.write(flowFile, TransformCallback())
# Finish by transferring the FlowFile to an output relationship
session.transfer(flowFile, REL_SUCCESS)
一旦问题是关于 nifi 的,这里有一些似乎更容易的替代方案
与上面相同的代码,但在 groovy 中用于 nifi ExecuteScript
处理器:
def ff = session.get()
if(!ff)return
ff = session.write(ff, {rawIn, rawOut->
// ## transform streams into reader and writer
rawIn.withReader("UTF-8"){reader->
rawOut.withWriter("UTF-8"){writer->
reader.eachLine{line, lineNum->
if(lineNum>1) { // # skip the first line
// ## let use regular expression to transform each line
writer << line.replaceAll( /^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?/ , '' ) << '\n'
}
}
}
}
} as StreamCallback)
session.transfer(ff, REL_SUCCESS)
替换文本处理器
如果正则表达式没问题 - nifi 中最简单的方法是 ReplaceText
处理器,它可以逐行替换正则表达式。
在这种情况下,您无需编写任何代码,只需构建正则表达式并正确配置您的处理器即可。
您可以使用正则表达式获取它们:
(\d\d-\d\d-\d\d\d\d\ \d\d:\d\d:)(\d+(?:\.\d+)*)(-\d\d)$
然后将#2 替换为#2 的四舍五入版本
请参阅 regexr.com
中的正则表达式示例您甚至可以 "nicer" 通过使用捕获组获取每个值,然后将它们放入 datetime.datetime 对象并从那里打印出来,但是恕我直言,这会有点矫枉过正可维护性,让你失去太多的性能。
代码 无法测试
import re
...
pattern = '^(.{25})(\d+(?:\.\d+)*)(-\d\d)$' //used offset for simplicity
....
for line in text[1:]:
match = re.search(pattern, line)
line = match.group(1) + round(match.group(2),3) + match.group(3)
outputStream.write(line + "\n")
只是使用纯 jython。这是一个可以适应OP需求的例子。
为此 csv 文件定义一个日期时间解析器
from datetime import datetime
def parse_datetime(dtstr):
mydatestr='-'.join(dtstr.split('-')[:-1])
try:
return datetime.strptime(mydatestr,'%d-%m-%Y %H:%M:%S.%f').strftime('%d-%m-%Y %H:%M:%S.%f')[:-3]
except ValueError:
return datetime.strptime(mydatestr,'%d-%m-%Y %H:%M:%S').strftime('%d-%m-%Y %H:%M:%S')
我的 test.csv
包括这样的数据:(2015 年没有 2 月 29 日必须更改 OP 的示例)。
1,Ni,23,27-02-2015 12:22:33.2212-02
2,Fi,21,28-02-2015 12:22:34.3212-02
3,Us,33,30-03-2015 12:23:35-01
4,Uk,34,31-03-2015 12:24:36.332211-02
现在是解决方案
with open('test.csv') as fi:
for line in fi:
line_split=line.split(',')
out_line = ', '.join(word if i<3 else parse_datetime(word) for i,word in enumerate(line_split))
#print(out_line)
#you can write this out_line to a file here.
打印 out_line
看起来像这样
1, Ni, 23, 27-02-2015 12:22:33.221
2, Fi, 21, 28-02-2015 12:22:34.321
3, Us, 33, 30-03-2015 12:23:35
4, Uk, 34, 31-03-2015 12:24:36.332