Neo4j LOAD CSV 应用线程阻塞

Neo4j LOAD CSV Application Threads Blocked

我正在尝试使用 neo4j 中的 LOAD CSV 命令从 CSV 文件导入大约 500,000 行数据。

csv 中的数据是这样组织的:

Artists    |    Feature1    |    Feature2    |    Feature3

每一列都填满了音乐艺术家的名字,其中大多数人在一个列中出现不止一次。如果艺术家姓名出现在任何列中,我希望该艺术家有一个节点。如果该艺术家的名字在不止一列的一列 an/or 中出现不止一次,我希望该艺术家只有一个节点。

对于每位艺术家,我想跟踪他们的特色以及他们的特色。 csv 的每一行代表一首歌。对于艺术家制作的每首歌曲(csv 的每一行),我想添加从 Artist 列中的艺术家到 Features1/2/3 列中的艺术家的 FEATURES 关系。

这是我使用的代码:

CREATE CONSTRAINT ON (a:Artist) ASSERT a.artistName IS UNIQUE;

USING PERIODIC COMMIT 50
LOAD CSV WITH HEADERS from 'https://aws.bigfile.csv' as line
MERGE (artist:Artist {artistName: line.Artist})
MERGE (feature1:Artist {artistName: line.Feature1})
MERGE (feature2:Artist {artistName: line.Feature2})
MERGE (feature3:Artist {artistName: line.Feature3})

MERGE (artist)-[f1:FEATURES]->(feature1) 
ON CREATE SET f1.strength = 1
ON MATCH SET f1.strength = f1.strength + 1

MERGE (artist)-[f2:FEATURES]->(feature2) 
ON CREATE SET f2.strength = 1
ON MATCH SET f2.strength = f2.strength + 1

MERGE (artist)-[f3:FEATURES]->(feature3) 
ON CREATE SET f3.strength = 1
ON MATCH SET f3.strength = f3.strength + 1

期望的行为:首次出现某人与另一位艺术家合作会创建 FEATURES 关系,并且应将 FEATURES 关系的 strength 属性 设置为 1。对于随后的每一次出现,强度 属性 都会增加 1。因此,经常出现艺术家 B 的艺术家 A 应该具有 (a)-[:FEATURES {strength: AHighNumber]->(b)

这样的关系

在这种情况下,关系是方向性的,方向很重要(A 与 B 的关系不同于 B 与 A 的关系)。

应该有超过 10,000 个不同的艺术家和节点,但是大约 2,000 个节点我开始遇到系统超时问题。

我在日志中收到以下消息:

2017-12-30 10:54:04.268+0000 WARN [o.n.k.i.c.MonitorGc] GC Monitor: Application threads blocked for 467ms.

是否有任何其他信息可能有助于确定问题?知道如何重组我的代码以避免这个问题吗?非常感谢所有帮助。谢谢!

我找到了一个(中间)解决方案,它不能直接解决上述内存问题,但仍然有效。

之前,我试图从原始文件创建艺术家节点,合并它们,以便每个艺术家都有一个且只有一个节点。我怀疑节点的 creating/merging 变得昂贵,因为对于文件的每一行,密码基本上必须查看所有现有节点以查看是否为该行创建一个新节点。

相反,我编写了一些 Python 代码,为每一列获取唯一值(艺术家姓名),将它们组合成一个长列表,然后再次从该列表中获取唯一值以过滤掉重复的艺术家出现在不止一栏中。然后我把它写到一个只有一列艺术家的文件中。我使用这个短得多的文件来创建所有艺术家节点。代码:

data = pd.read_csv('artists-full.csv', encoding = 'latin1')
columns = ['Artist', 'Feature1', 'Feature2', 'Feature3']
df = pd.DataFrame(data, columns=columns)

# Get an array of strings for each column. Clean out repeats
artists_set1 = df.loc[:, 'Artist'].unique().tolist()
artists_set2 = df.loc[:, 'Feature1'].unique().tolist()
artists_set3 = df.loc[:, 'Feature2'].unique().tolist()
artists_set4 = df.loc[:, 'Feature3'].unique().tolist()

# Combine the artists from different columns
artists_list = [artists_set1, artists_set2, artists_set3, artists_set4]
flat_artists_list = [item for sublist in artists_list for item in sublist]
# Get rid of duplicates by creating a set
artists_set = set(flat_artists_list)
# Turn the set back into a list
artists = list(artists_set)
print(len(artists))

# Write the list to csv
with open ('artists.csv', 'w', encoding='utf-8') as f:
    for artist in artists:
        f.write(str(artist) + '\n')

从那里,我使用原始文件和以下 Cypher 代码创建了艺术家之间的关系:

CREATE CONSTRAINT ON (a:Artist) ASSERT a.artistName IS UNIQUE;
MERGE (artist)-[f1:FEATURES]->(feature1) 
ON CREATE SET f1.strength = 1
ON MATCH SET f1.strength = f1.strength + 1

MERGE (artist)-[f2:FEATURES]->(feature2) 
ON CREATE SET f2.strength = 1
ON MATCH SET f2.strength = f2.strength + 1

MERGE (artist)-[f3:FEATURES]->(feature3) 
ON CREATE SET f3.strength = 1
ON MATCH SET f3.strength = f3.strength + 1

如果您将查询粘贴(不要 运行,直接粘贴)到浏览器中,您会在旁边收到一条警告,扩展后会显示:

The execution plan for this query contains the Eager operator, which forces all dependent data to be materialized in main memory before proceeding. Using LOAD CSV with a large data set in a query where the execution plan contains the Eager operator could potentially consume a lot of memory and is likely to not perform well. See the Neo4j Manual entry on the Eager operator for more information and hints on how problems could be avoided.

这就解释了为什么您会看到性能问题...它有效地取消了对定期提交的任何使用。

当您在同一节点标签上有多个 MERGE 时,您可以在查询计划中获得急切操作。解决此类问题的一般方法是通过合并 CSV 中单个变量的节点:

USING PERIODIC COMMIT 50
LOAD CSV WITH HEADERS from 'https://aws.bigfile.csv' as line
MERGE (artist:Artist {artistName: line.Artist})

如果您的 CSV 中的 line.Artist 包含所有可能的艺术家,那么单次传递应该有效。但是,如果 line.Feature1 中有其他艺术家(以及其他艺术家)不在 line.Artist 中,那么您将需要依次为每个艺术家执行另一遍,直到加载所有节点(您也可以将 PERIODIC COMMIT 推到 5000 到 10000 左右)。

请注意,您对多个节点的相同关系类型的多个 MERGE 也有助于 EAGER 操作。您真的应该看看是否可以获取不同格式的 CSV。你真的不需要所有那些额外的列,line.Artistline.Feature 应该足够了,然后你会有多行具有相同的艺术家和不同的特征,你不会被抓住进行急切的操作。

为避免急切操作并允许使用定期提交,您的替代方法是分 3 次执行查询(在导入所有节点之后):

USING PERIODIC COMMIT 10000
LOAD CSV WITH HEADERS from 'https://aws.bigfile.csv' as line
match (artist:Artist {artistName: line.Artist})
match (feature:Artist {artistName: line.Feature1})

MERGE (artist)-[f:FEATURES]->(feature1) 
ON CREATE SET f.strength = 1
ON MATCH SET f.strength = f.strength + 1

然后重复 line.Feature2line.Feature3