Neo4j - 实时导入日志文件的写入速度太慢?
Neo4j - too slow writing speed for importing log files in real-time?
我正在尝试了解是否可以使用 Neo4j 分析网络流量日志文件。
因此,我使用 "sh"-library 从 Bro IDS 实时 "tailing" 3 个不同的日志文件,并将日志记录导入到 neo4j 中,这使用 py2neo 似乎非常慢。 CSV 导入在这里不起作用,因为它是实时的。
举个例子:我正在使用 tcpreplay 分析一个一小时的数据包捕获文件,该文件有近 4.000.000 个连接。我什至用一半的速度演奏。所以 2 小时后我现在有大约 4.000.000 log entries.Right,在分析开始后 3.5 小时,我刚刚导入了 289691 个由 5 个节点和 4 个关系组成的图表。总而言之,大约 15% 的数据用了几乎两倍的时间。
我正在使用 py2neo,代码如下所示(这是其中一张图):
def create_conn_graph(connlog):
[...]
## Start Session
graph = Graph(bolt=True, password="neo4j")
tx = graph.begin()
############
## Nodes ##
############
## Connection Node
conn = Node("Connection", uid=connlog['uid'],
ts=connlog['ts'],
date=evt_date,
time=evt_time,
[...])
conn_properties = dict(conn)
for key in conn_properties.keys():
if conn[key] == "-" or conn[key] == "(empty)":
conn[key] = "0"
conn.update()
tx.merge(conn, "Connection", "uid")
## IP Nodes
orig = Node("IP", ip=connlog['orig_h'])
tx.merge(orig)
resp = Node("IP", ip=connlog['resp_h'])
tx.merge(resp)
## History Node
if connlog['history']:
hist_flow = history_flow(connlog['history'])
history_node = Node("History", history=connlog['history'], flow=hist_flow)
tx.merge(history_node, "History", "history")
## (Connection)-[HAS_HISTORY]->(History)
conn_hist = Relationship(conn, "HAS_HISTORY", history_node)
tx.merge(conn_hist)
## Conn_State
conn_state = Node("Conn_State", state=connlog['conn_state'], meaning=CONN_STATE[connlog['conn_state']])
tx.merge(conn_state, "Conn_State", "conn_state")
tx.commit()
tx = graph.begin()
#####################
## Relationships ##
#####################
## (IP)-[STARTS_CONNECTION]->(Connection)
orig_conn = Relationship(orig, "STARTS_CONNECTION", conn, port=connlog['orig_p'])
tx.merge(orig_conn)
## (Connection)-[CONNECTS_TO]->(IP)
conn_resp = Relationship(conn, "CONNECTS_TO", resp, port=connlog['resp_p'])
tx.merge(conn_resp)
## (Connection)-[HAS_CONN_STATE]->(Conn_State)
conn_connstate = Relationship(conn, "HAS_CONN_STATE", conn_state)
tx.merge(conn_connstate)
tx.commit()
## (Connection)-[PRODUCED]-> (DNS|HTTP)
if connlog['service'] == "dns":
graph.run("MATCH (c:Connection {uid:{uid}}), (d:DNS {uid:{uid}}) \
MERGE (c)-[:PRODUCED]->(d)",
{"uid": connlog['uid']})
if connlog['service'] == "http":
graph.run("MATCH (c:Connection {uid:{uid}}), (d:HTTP {uid:{uid}}) \
MERGE (c)-[:PRODUCED]->(d)",
{"uid": connlog['uid']})
return True
## End of create_conn_graph ########################################
if __name__ == "__main__":
logentry = {}
logfield = CONNLOG
logline = []
for line in tail("-F", LOG_DIR, _iter=True, _bg=True):
entry = line.strip().split("\t")
if line.startswith('#'):
continue
for i in range(len(logfield)):
logentry[logfield[i]] = entry[i]
create_conn_graph(logentry)
我有以下约束和索引:
graph.run("CREATE CONSTRAINT ON (c:Connection) ASSERT c.uid IS UNIQUE")
graph.run("CREATE CONSTRAINT ON (i:IP) ASSERT i.ip IS UNIQUE")
graph.run("CREATE CONSTRAINT ON (c:Conn_State) ASSERT c.conn_state IS UNIQUE")
graph.run("CREATE CONSTRAINT ON (h:History) ASSERT h.history IS UNIQUE")
graph.run("CREATE CONSTRAINT ON (host:Host) ASSERT host.host is UNIQUE")
graph.run("CREATE CONSTRAINT ON (q:QueryType) ASSERT q.type is UNIQUE")
graph.run("CREATE CONSTRAINT ON (qc:QueryClass) ASSERT qc.class is UNIQUE")
graph.run("CREATE CONSTRAINT ON (rc:ResponseCode) ASSERT rc.code is UNIQUE")
graph.run("CREATE CONSTRAINT ON (ic:InfoCode) ASSERT ic.code is UNIQUE")
graph.run("CREATE CONSTRAINT ON (ua:UserAgent) ASSERT ua.useragent is UNIQUE")
graph.run("CREATE CONSTRAINT ON (m:Method) ASSERT m.method is UNIQUE")
graph.run("CREATE CONSTRAINT ON (r:Referrer) ASSERT r.referrer is UNIQUE")
graph.run("CREATE INDEX ON :DNS(uid)")
graph.run("CREATE INDEX ON :Uri(uri)")
graph.run("CREATE INDEX ON :HTTP(uid)")
也许有人可以给我提示我做错了什么或者我在代码中哪里出错了?
提交量是由于尝试写入 neo4j 时的瞬态错误造成的。随着交易数量的增加,我没有更多的错误。
在此先感谢您的帮助
我不确定 py2neo 在做什么,根据我的经验,python 驱动程序不是最快的。
我可能会使用简单的 Cypher 语句,您可以完全控制所发生的事情。
您还有一些 wrong/missing 索引,请检查您的所有查询/操作是否都使用索引。否则将导致全面扫描。
- (d:DNS {uid:{uid}})
- (d:HTTP {uid:{uid}})
我还建议您在每笔交易中发送更多数据(例如 10k 条记录)
对每个日志批处理进行一些预处理也可能有意义,例如每个日志段而不是每个日志行预先创建不同的 ip 节点。
我正在尝试了解是否可以使用 Neo4j 分析网络流量日志文件。 因此,我使用 "sh"-library 从 Bro IDS 实时 "tailing" 3 个不同的日志文件,并将日志记录导入到 neo4j 中,这使用 py2neo 似乎非常慢。 CSV 导入在这里不起作用,因为它是实时的。
举个例子:我正在使用 tcpreplay 分析一个一小时的数据包捕获文件,该文件有近 4.000.000 个连接。我什至用一半的速度演奏。所以 2 小时后我现在有大约 4.000.000 log entries.Right,在分析开始后 3.5 小时,我刚刚导入了 289691 个由 5 个节点和 4 个关系组成的图表。总而言之,大约 15% 的数据用了几乎两倍的时间。
我正在使用 py2neo,代码如下所示(这是其中一张图):
def create_conn_graph(connlog):
[...]
## Start Session
graph = Graph(bolt=True, password="neo4j")
tx = graph.begin()
############
## Nodes ##
############
## Connection Node
conn = Node("Connection", uid=connlog['uid'],
ts=connlog['ts'],
date=evt_date,
time=evt_time,
[...])
conn_properties = dict(conn)
for key in conn_properties.keys():
if conn[key] == "-" or conn[key] == "(empty)":
conn[key] = "0"
conn.update()
tx.merge(conn, "Connection", "uid")
## IP Nodes
orig = Node("IP", ip=connlog['orig_h'])
tx.merge(orig)
resp = Node("IP", ip=connlog['resp_h'])
tx.merge(resp)
## History Node
if connlog['history']:
hist_flow = history_flow(connlog['history'])
history_node = Node("History", history=connlog['history'], flow=hist_flow)
tx.merge(history_node, "History", "history")
## (Connection)-[HAS_HISTORY]->(History)
conn_hist = Relationship(conn, "HAS_HISTORY", history_node)
tx.merge(conn_hist)
## Conn_State
conn_state = Node("Conn_State", state=connlog['conn_state'], meaning=CONN_STATE[connlog['conn_state']])
tx.merge(conn_state, "Conn_State", "conn_state")
tx.commit()
tx = graph.begin()
#####################
## Relationships ##
#####################
## (IP)-[STARTS_CONNECTION]->(Connection)
orig_conn = Relationship(orig, "STARTS_CONNECTION", conn, port=connlog['orig_p'])
tx.merge(orig_conn)
## (Connection)-[CONNECTS_TO]->(IP)
conn_resp = Relationship(conn, "CONNECTS_TO", resp, port=connlog['resp_p'])
tx.merge(conn_resp)
## (Connection)-[HAS_CONN_STATE]->(Conn_State)
conn_connstate = Relationship(conn, "HAS_CONN_STATE", conn_state)
tx.merge(conn_connstate)
tx.commit()
## (Connection)-[PRODUCED]-> (DNS|HTTP)
if connlog['service'] == "dns":
graph.run("MATCH (c:Connection {uid:{uid}}), (d:DNS {uid:{uid}}) \
MERGE (c)-[:PRODUCED]->(d)",
{"uid": connlog['uid']})
if connlog['service'] == "http":
graph.run("MATCH (c:Connection {uid:{uid}}), (d:HTTP {uid:{uid}}) \
MERGE (c)-[:PRODUCED]->(d)",
{"uid": connlog['uid']})
return True
## End of create_conn_graph ########################################
if __name__ == "__main__":
logentry = {}
logfield = CONNLOG
logline = []
for line in tail("-F", LOG_DIR, _iter=True, _bg=True):
entry = line.strip().split("\t")
if line.startswith('#'):
continue
for i in range(len(logfield)):
logentry[logfield[i]] = entry[i]
create_conn_graph(logentry)
我有以下约束和索引:
graph.run("CREATE CONSTRAINT ON (c:Connection) ASSERT c.uid IS UNIQUE")
graph.run("CREATE CONSTRAINT ON (i:IP) ASSERT i.ip IS UNIQUE")
graph.run("CREATE CONSTRAINT ON (c:Conn_State) ASSERT c.conn_state IS UNIQUE")
graph.run("CREATE CONSTRAINT ON (h:History) ASSERT h.history IS UNIQUE")
graph.run("CREATE CONSTRAINT ON (host:Host) ASSERT host.host is UNIQUE")
graph.run("CREATE CONSTRAINT ON (q:QueryType) ASSERT q.type is UNIQUE")
graph.run("CREATE CONSTRAINT ON (qc:QueryClass) ASSERT qc.class is UNIQUE")
graph.run("CREATE CONSTRAINT ON (rc:ResponseCode) ASSERT rc.code is UNIQUE")
graph.run("CREATE CONSTRAINT ON (ic:InfoCode) ASSERT ic.code is UNIQUE")
graph.run("CREATE CONSTRAINT ON (ua:UserAgent) ASSERT ua.useragent is UNIQUE")
graph.run("CREATE CONSTRAINT ON (m:Method) ASSERT m.method is UNIQUE")
graph.run("CREATE CONSTRAINT ON (r:Referrer) ASSERT r.referrer is UNIQUE")
graph.run("CREATE INDEX ON :DNS(uid)")
graph.run("CREATE INDEX ON :Uri(uri)")
graph.run("CREATE INDEX ON :HTTP(uid)")
也许有人可以给我提示我做错了什么或者我在代码中哪里出错了? 提交量是由于尝试写入 neo4j 时的瞬态错误造成的。随着交易数量的增加,我没有更多的错误。
在此先感谢您的帮助
我不确定 py2neo 在做什么,根据我的经验,python 驱动程序不是最快的。
我可能会使用简单的 Cypher 语句,您可以完全控制所发生的事情。
您还有一些 wrong/missing 索引,请检查您的所有查询/操作是否都使用索引。否则将导致全面扫描。
- (d:DNS {uid:{uid}})
- (d:HTTP {uid:{uid}})
我还建议您在每笔交易中发送更多数据(例如 10k 条记录)
对每个日志批处理进行一些预处理也可能有意义,例如每个日志段而不是每个日志行预先创建不同的 ip 节点。