从 10-k 中提取产品描述
Extract the product description from 10-k
我想从我的硕士论文(编程新手,金融背景)的 10-k 报告中提取产品描述。此产品描述介于报告的 "ITEM 1" 和 "ITEM 2" 之间。到目前为止,我所做的是以 .txt 形式下载所有 10-ks,删除 html 标签并将所有文本设为大写。我现在的问题是当我尝试 select 我需要的文本并将其保存到另一个目录时。我尝试自己做 selection,但结果并不令人满意。目前,我正在使用一个人 "iammrhelo" 在 GitHub 上制作的代码。他的代码用于 selecting "ITEM 7" 到 "ITEM 8"。通过一些调整,让它搜索我需要的东西。 Link 到他的代码:https://github.com/iammrhelo/edgar-10k-mda
我现在的问题是他对所有 10-ks 的解析都不起作用。它适用于 selecting 这个 10k 中的产品描述:
picture: 10k that the code is able to parse
picture: 10k that the code is NOT able to parse
为了提供一点上下文,我需要找到代码必须寻找的正确语法。正在查找的语法在列表 item1_begins 中。
我用于 select 文本的代码如下:
import argparse
import codecs
import os
import time
import re
from pathos.pools import ProcessPool
from pathos.helpers import cpu_count
class MDAParser(object):
def __init__(self):
pass
def extract(self, txt_dir, mda_dir, parsing_log):
self.txt_dir = txt_dir
if not os.path.exists(txt_dir):
os.makedirs(txt_dir)
self.mda_dir = mda_dir
if not os.path.exists(mda_dir):
os.makedirs(mda_dir)
def text_gen(txt_dir):
# Yields markup & name
for fname in os.listdir(txt_dir):
if not fname.endswith('.txt'):
continue
yield fname
def parsing_job(fname):
print("Parsing: {}".format(fname))
# Read text
filepath = os.path.join(self.txt_dir,fname)
with codecs.open(filepath,'rb',encoding='utf-8') as fin:
text = fin.read()
name, ext = os.path.splitext(fname)
# Parse MDA part
msg = ""
mda, end = self.parse_mda(text)
# Parse second time if first parse results in index
if mda and len(mda.encode('utf-8')) < 1000:
mda, _ = self.parse_mda(text, start=end)
if mda: # Has value
msg = "SUCCESS"
mda_path = os.path.join(self.mda_dir, name + '.txt')
with codecs.open(mda_path,'w', encoding='utf-8') as fout:
fout.write(mda)
else:
msg = msg if mda else "MDA NOT FOUND"
#print("{},{}".format(name,msg))
return name + '.txt', msg #
ncpus = cpu_count() if cpu_count() <= 8 else 8
pool = ProcessPool( ncpus )
_start = time.time()
parsing_failed = pool.map( parsing_job, \
text_gen(self.txt_dir) )
_end = time.time()
print("MDA parsing time taken: {} seconds.".format(_end-_start))
# Write failed parsing list
count = 0
with open(parsing_log,'w') as fout:
print("Writing parsing results to {}".format(parsing_log))
for name, msg in parsing_failed:
fout.write('{},{}\n'.format(name,msg))
if msg != "SUCCESS":
count = count + 1
print("Number of failed text:{}".format(count))
def parse_mda(self, text, start=0):
debug = False
"""
Return Values
"""
mda = ""
end = 0
"""
Parsing Rules
"""
# Define start & end signal for parsing
item1_begins = [ '\nITEM 1.', 'ITEM 1.' '\nITEM 1 –', '\nITEM 1:', '\nITEM 1 ', '\nITEM 1.\n', '\nITEM 1.\n']
item1_ends = [ '\nITEM 1A']
if start != 0:
item1_ends.append('\nITEM 1') # Case: ITEM 1A does not exist
item2_begins = [ '\nITEM 2']
"""
Parsing code section
"""
text = text[start:]
# Get begin
for item1 in item1_begins:
begin = text.find(item1)
if debug:
print(item1,begin)
if begin != -1:
break
if begin != -1: # Begin found
for item1A in item1_ends:
end = text.find(item1A, begin+1)
if debug:
print(item1A,end)
if end != -1:
break
if end == -1: # ITEM 7A does not exist
for item2 in item2_begins:
end = text.find(item2, begin+1)
if debug:
print(item2,end)
if end != -1:
break
# Get MDA
if end > begin:
mda = text[begin:end].strip()
else:
end = 0
return mda, end
if __name__ == "__main__":
parser = argparse.ArgumentParser("Parse MDA section of Edgar Form 10k")
parser.add_argument('--txt_dir',type=str,default='C:/Users/Adrian PC/Desktop/Thesis stuff/10k abbot/python/10ktxt/')
parser.add_argument('--mda_dir',type=str,default='./data/mda')
parser.add_argument('--log_file',type=str,default='./parsing.log')
args = parser.parse_args()
# Extract MD&A from processed text
# Note that the parser parses every text in the text_dir, not according to the index file
parser = MDAParser()
parser.extract(txt_dir=args.txt_dir, mda_dir=args.mda_dir, parsing_log=args.log_file)
如果我的理解正确,您需要获取 ITEM 之间的数据并将其放入列表中。
你可以做的是使用正则表达式https://docs.python.org/3.4/library/re.html。解析文本超级强大,我在那个脚本里看到它是导入的,没有使用。
如果你想在项目之间创建一个数据列表,你可以这样做:
import re
item_symbol = re.compile('ITEM..\.') # creates the regular expression for the pattern 'ITEM #.' where # is the number
item_data = item_symbol.split(parsing_log) #splits the string into list items on the regular expression
示例:example
我想从我的硕士论文(编程新手,金融背景)的 10-k 报告中提取产品描述。此产品描述介于报告的 "ITEM 1" 和 "ITEM 2" 之间。到目前为止,我所做的是以 .txt 形式下载所有 10-ks,删除 html 标签并将所有文本设为大写。我现在的问题是当我尝试 select 我需要的文本并将其保存到另一个目录时。我尝试自己做 selection,但结果并不令人满意。目前,我正在使用一个人 "iammrhelo" 在 GitHub 上制作的代码。他的代码用于 selecting "ITEM 7" 到 "ITEM 8"。通过一些调整,让它搜索我需要的东西。 Link 到他的代码:https://github.com/iammrhelo/edgar-10k-mda
我现在的问题是他对所有 10-ks 的解析都不起作用。它适用于 selecting 这个 10k 中的产品描述:
picture: 10k that the code is able to parse
picture: 10k that the code is NOT able to parse
为了提供一点上下文,我需要找到代码必须寻找的正确语法。正在查找的语法在列表 item1_begins 中。 我用于 select 文本的代码如下:
import argparse
import codecs
import os
import time
import re
from pathos.pools import ProcessPool
from pathos.helpers import cpu_count
class MDAParser(object):
def __init__(self):
pass
def extract(self, txt_dir, mda_dir, parsing_log):
self.txt_dir = txt_dir
if not os.path.exists(txt_dir):
os.makedirs(txt_dir)
self.mda_dir = mda_dir
if not os.path.exists(mda_dir):
os.makedirs(mda_dir)
def text_gen(txt_dir):
# Yields markup & name
for fname in os.listdir(txt_dir):
if not fname.endswith('.txt'):
continue
yield fname
def parsing_job(fname):
print("Parsing: {}".format(fname))
# Read text
filepath = os.path.join(self.txt_dir,fname)
with codecs.open(filepath,'rb',encoding='utf-8') as fin:
text = fin.read()
name, ext = os.path.splitext(fname)
# Parse MDA part
msg = ""
mda, end = self.parse_mda(text)
# Parse second time if first parse results in index
if mda and len(mda.encode('utf-8')) < 1000:
mda, _ = self.parse_mda(text, start=end)
if mda: # Has value
msg = "SUCCESS"
mda_path = os.path.join(self.mda_dir, name + '.txt')
with codecs.open(mda_path,'w', encoding='utf-8') as fout:
fout.write(mda)
else:
msg = msg if mda else "MDA NOT FOUND"
#print("{},{}".format(name,msg))
return name + '.txt', msg #
ncpus = cpu_count() if cpu_count() <= 8 else 8
pool = ProcessPool( ncpus )
_start = time.time()
parsing_failed = pool.map( parsing_job, \
text_gen(self.txt_dir) )
_end = time.time()
print("MDA parsing time taken: {} seconds.".format(_end-_start))
# Write failed parsing list
count = 0
with open(parsing_log,'w') as fout:
print("Writing parsing results to {}".format(parsing_log))
for name, msg in parsing_failed:
fout.write('{},{}\n'.format(name,msg))
if msg != "SUCCESS":
count = count + 1
print("Number of failed text:{}".format(count))
def parse_mda(self, text, start=0):
debug = False
"""
Return Values
"""
mda = ""
end = 0
"""
Parsing Rules
"""
# Define start & end signal for parsing
item1_begins = [ '\nITEM 1.', 'ITEM 1.' '\nITEM 1 –', '\nITEM 1:', '\nITEM 1 ', '\nITEM 1.\n', '\nITEM 1.\n']
item1_ends = [ '\nITEM 1A']
if start != 0:
item1_ends.append('\nITEM 1') # Case: ITEM 1A does not exist
item2_begins = [ '\nITEM 2']
"""
Parsing code section
"""
text = text[start:]
# Get begin
for item1 in item1_begins:
begin = text.find(item1)
if debug:
print(item1,begin)
if begin != -1:
break
if begin != -1: # Begin found
for item1A in item1_ends:
end = text.find(item1A, begin+1)
if debug:
print(item1A,end)
if end != -1:
break
if end == -1: # ITEM 7A does not exist
for item2 in item2_begins:
end = text.find(item2, begin+1)
if debug:
print(item2,end)
if end != -1:
break
# Get MDA
if end > begin:
mda = text[begin:end].strip()
else:
end = 0
return mda, end
if __name__ == "__main__":
parser = argparse.ArgumentParser("Parse MDA section of Edgar Form 10k")
parser.add_argument('--txt_dir',type=str,default='C:/Users/Adrian PC/Desktop/Thesis stuff/10k abbot/python/10ktxt/')
parser.add_argument('--mda_dir',type=str,default='./data/mda')
parser.add_argument('--log_file',type=str,default='./parsing.log')
args = parser.parse_args()
# Extract MD&A from processed text
# Note that the parser parses every text in the text_dir, not according to the index file
parser = MDAParser()
parser.extract(txt_dir=args.txt_dir, mda_dir=args.mda_dir, parsing_log=args.log_file)
如果我的理解正确,您需要获取 ITEM 之间的数据并将其放入列表中。
你可以做的是使用正则表达式https://docs.python.org/3.4/library/re.html。解析文本超级强大,我在那个脚本里看到它是导入的,没有使用。
如果你想在项目之间创建一个数据列表,你可以这样做:
import re
item_symbol = re.compile('ITEM..\.') # creates the regular expression for the pattern 'ITEM #.' where # is the number
item_data = item_symbol.split(parsing_log) #splits the string into list items on the regular expression
示例:example