os.walk 设置起点和终点 - python
os.walk set start and end point - python
我正在寻找如何在 os.walk 遍历特定文件后停止它。
我有一个按日期组织的日志文件目录。我正在尝试替换 grep 搜索,允许用户查找存储在他们指定的日期范围内的 ip 地址。
该程序将采用以下参数:
-i 带子网的 ipv4 或 ipv6 地址
-s 开始日期即 2013/12/20 匹配文件结构
-e 结束日期
我假设因为他们的自上而下选项是一个允许我声明端点的逻辑,执行此操作的最佳方法是什么?我在想 while 循环。
如果我的问题有问题,我提前道歉。刚测了血糖,低56,gd type 1。
附加信息
文件结构将位于 flows/index_border 中
2013
--01
--02
----01
----...
----29
2014
___________Hope这就清楚了,年文件夹包含月文件夹,包含日文件夹,包含小时文件。日期增加 downwards.___________________
结束日期需要包括在内,(我没有太关注它,因为我可以添加代码以向上移动一天)
我一直在尝试制作一个日期范围函数,令我惊讶的是我在任何日期时间文档中都没有看到它,看起来它很有用。
import os, gzip, netaddr, datetime, argparse
startDir = '.'
def sdate_format(s):
try:
return (datetime.datetime.strptime(s, '%Y/%m/%d').date())
except ValueError:
msg = "Bad start date. Please use yyyy/mm/dd format."
raise argparse.ArgumentTypeError(msg)
def edate_format(e):
try:
return (datetime.datetime.strptime(e, '%Y/%m/%d').date())
except ValueError:
msg = "Bad end date. Please use yyyy/mm/dd format."
raise argparse.ArgumentTypeError(msg)
parser = argparse.ArgumentParser(description='Locate IP address in log files for a particular date or date range')
parser.add_argument('-s', '--start_date', action='store', type=sdate_format, dest='start_date', help='The first date in range of interest.')
parser.add_argument('-e', '--end_date', action='store', type=edate_format, dest='end_date', help='The last date in range of interest.')
parser.add_argument('-i', action='store', dest='net', help='IP address or address range, IPv4 or IPv6 with optional subnet accepted.', required=True)
results = parser.parse_args()
start = results.start_date
end = results.end_date
target_ip = results.net
startDir = '/flows/index_border/{0}/{1:02d}/{2:02d}'.format(start.year, start.month, start.day)
print('searching...')
for root, dirs, files in os.walk(startDir):
for contents in files:
if contents.endswith('.gz'):
f = gzip.open(os.path.join(root, contents), 'r')
else:
f = open(os.path.join(root, contents), 'r')
text = f.readlines()
f.close()
for line in text:
for address_item in netaddr.IPNetwork(target_IP):
if str(address_item) in line:
print line,
您需要描述什么有效或无效。您的代码的 argparse
看起来不错,但我没有进行任何测试。 type
的使用令人耳目一新。 :)(张贴者经常滥用该参数。)
但至于 stopping
,我猜你可以这样做:
endDir = '/flows/index_border/{0}/{1:02d}/{2:02d}'.format(end.year, end.month, end.day)
for root, dirs, files in os.walk(startDir):
for contents in files:
....
if endDir in <something based on dirs and files>:
break
我对你的文件结构了解不够,无法更具体。我和 os.walk
一起工作也有一段时间了。无论如何,我认为有条件的 break
是提前停止步行的方法。
#!/usr/bin/env python
import os, gzip, netaddr, datetime, argparse, sys
searchDir = '.'
searchItems = []
def sdate_format(s):
try:
return (datetime.datetime.strptime(s, '%Y/%m/%d').date())
except ValueError:
msg = "Bad start date. Please use yyyy/mm/dd format."
raise argparse.ArgumentTypeError(msg)
def edate_format(e):
try:
return (datetime.datetime.strptime(e, '%Y/%m/%d').date())
except ValueError:
msg = "Bad end date. Please use yyyy/mm/dd format."
raise argparse.ArgumentTypeError(msg)
parser = argparse.ArgumentParser(description='Locate IP address in log files for a particular date or date range')
parser.add_argument('-s', '--start_date', action='store', type=sdate_format, dest='start_date',
help='The first date in range of interest.', required=True)
parser.add_argument('-e', '--end_date', action='store', type=edate_format, dest='end_date',
help='The last date in range of interest.', required=True)
parser.add_argument('-i', action='store', dest='net',
help='IP address or address range, IPv4 or IPv6 with optional subnet accepted.', required=True)
results = parser.parse_args()
start = results.start_date
end = results.end_date + datetime.timedelta(days=1)
target_IP = results.net
dateRange = end - start
for addressOfInterest in(netaddr.IPNetwork(target_IP)):
searchItems.append(str(addressOfInterest))
print('searching...')
for eachDay in range(dateRange.days):
period = start+datetime.timedelta(days=eachDay)
searchDir = '/flows/index_border/{0}/{1:02d}/{2:02d}'.format(period.year, period.month, period.day)
for contents in os.listdir(searchDir):
if contents.endswith('.gz'):
f = gzip.open(os.path.join(searchDir, contents), 'rb')
text = f.readlines()
f.close()
else:
f = open(os.path.join(searchDir, contents), 'r')
text = f.readlines()
f.close()
#for line in text:
# break
for addressOfInterest in searchItems:
for line in text:
if addressOfInterest in line:
# if str(address_item) in line:
print contents
print line,
我在敲我的头,因为我以为我在打印副本。原来我要测试的文件有重复。由于文件系统的可预测性,我最终删除了 os.walk,但@hpaulj 确实提供了正确的解决方案。非常感谢!
我正在寻找如何在 os.walk 遍历特定文件后停止它。
我有一个按日期组织的日志文件目录。我正在尝试替换 grep 搜索,允许用户查找存储在他们指定的日期范围内的 ip 地址。
该程序将采用以下参数:
-i 带子网的 ipv4 或 ipv6 地址
-s 开始日期即 2013/12/20 匹配文件结构
-e 结束日期
我假设因为他们的自上而下选项是一个允许我声明端点的逻辑,执行此操作的最佳方法是什么?我在想 while 循环。
如果我的问题有问题,我提前道歉。刚测了血糖,低56,gd type 1。
附加信息
文件结构将位于 flows/index_border 中
2013
--01
--02
----01
----...
----29
2014
___________Hope这就清楚了,年文件夹包含月文件夹,包含日文件夹,包含小时文件。日期增加 downwards.___________________
结束日期需要包括在内,(我没有太关注它,因为我可以添加代码以向上移动一天)
我一直在尝试制作一个日期范围函数,令我惊讶的是我在任何日期时间文档中都没有看到它,看起来它很有用。
import os, gzip, netaddr, datetime, argparse
startDir = '.'
def sdate_format(s):
try:
return (datetime.datetime.strptime(s, '%Y/%m/%d').date())
except ValueError:
msg = "Bad start date. Please use yyyy/mm/dd format."
raise argparse.ArgumentTypeError(msg)
def edate_format(e):
try:
return (datetime.datetime.strptime(e, '%Y/%m/%d').date())
except ValueError:
msg = "Bad end date. Please use yyyy/mm/dd format."
raise argparse.ArgumentTypeError(msg)
parser = argparse.ArgumentParser(description='Locate IP address in log files for a particular date or date range')
parser.add_argument('-s', '--start_date', action='store', type=sdate_format, dest='start_date', help='The first date in range of interest.')
parser.add_argument('-e', '--end_date', action='store', type=edate_format, dest='end_date', help='The last date in range of interest.')
parser.add_argument('-i', action='store', dest='net', help='IP address or address range, IPv4 or IPv6 with optional subnet accepted.', required=True)
results = parser.parse_args()
start = results.start_date
end = results.end_date
target_ip = results.net
startDir = '/flows/index_border/{0}/{1:02d}/{2:02d}'.format(start.year, start.month, start.day)
print('searching...')
for root, dirs, files in os.walk(startDir):
for contents in files:
if contents.endswith('.gz'):
f = gzip.open(os.path.join(root, contents), 'r')
else:
f = open(os.path.join(root, contents), 'r')
text = f.readlines()
f.close()
for line in text:
for address_item in netaddr.IPNetwork(target_IP):
if str(address_item) in line:
print line,
您需要描述什么有效或无效。您的代码的 argparse
看起来不错,但我没有进行任何测试。 type
的使用令人耳目一新。 :)(张贴者经常滥用该参数。)
但至于 stopping
,我猜你可以这样做:
endDir = '/flows/index_border/{0}/{1:02d}/{2:02d}'.format(end.year, end.month, end.day)
for root, dirs, files in os.walk(startDir):
for contents in files:
....
if endDir in <something based on dirs and files>:
break
我对你的文件结构了解不够,无法更具体。我和 os.walk
一起工作也有一段时间了。无论如何,我认为有条件的 break
是提前停止步行的方法。
#!/usr/bin/env python
import os, gzip, netaddr, datetime, argparse, sys
searchDir = '.'
searchItems = []
def sdate_format(s):
try:
return (datetime.datetime.strptime(s, '%Y/%m/%d').date())
except ValueError:
msg = "Bad start date. Please use yyyy/mm/dd format."
raise argparse.ArgumentTypeError(msg)
def edate_format(e):
try:
return (datetime.datetime.strptime(e, '%Y/%m/%d').date())
except ValueError:
msg = "Bad end date. Please use yyyy/mm/dd format."
raise argparse.ArgumentTypeError(msg)
parser = argparse.ArgumentParser(description='Locate IP address in log files for a particular date or date range')
parser.add_argument('-s', '--start_date', action='store', type=sdate_format, dest='start_date',
help='The first date in range of interest.', required=True)
parser.add_argument('-e', '--end_date', action='store', type=edate_format, dest='end_date',
help='The last date in range of interest.', required=True)
parser.add_argument('-i', action='store', dest='net',
help='IP address or address range, IPv4 or IPv6 with optional subnet accepted.', required=True)
results = parser.parse_args()
start = results.start_date
end = results.end_date + datetime.timedelta(days=1)
target_IP = results.net
dateRange = end - start
for addressOfInterest in(netaddr.IPNetwork(target_IP)):
searchItems.append(str(addressOfInterest))
print('searching...')
for eachDay in range(dateRange.days):
period = start+datetime.timedelta(days=eachDay)
searchDir = '/flows/index_border/{0}/{1:02d}/{2:02d}'.format(period.year, period.month, period.day)
for contents in os.listdir(searchDir):
if contents.endswith('.gz'):
f = gzip.open(os.path.join(searchDir, contents), 'rb')
text = f.readlines()
f.close()
else:
f = open(os.path.join(searchDir, contents), 'r')
text = f.readlines()
f.close()
#for line in text:
# break
for addressOfInterest in searchItems:
for line in text:
if addressOfInterest in line:
# if str(address_item) in line:
print contents
print line,
我在敲我的头,因为我以为我在打印副本。原来我要测试的文件有重复。由于文件系统的可预测性,我最终删除了 os.walk,但@hpaulj 确实提供了正确的解决方案。非常感谢!