记录所有的快速查询
Logging all presto queries
如何将所有提交到 presto 集群的查询存储在一个文件(ORC 文件)或其他数据库中。目的是保留对 presto worker 执行的所有查询的记录。
我知道我需要覆盖 queryCompleted 方法,我也尝试按照 this 和那里提到的其他 link 但是我无法使用 maven 创建正确的 jar。放置maven生成的presto jar文件后,我的presto不工作了
我对 presto 和 maven 都是新手。如果有人可以帮助我,那就太好了。
这是我的方法,它适用于 EMR5.9 (presto 0.184)。
首先,如您所知,您可以使用事件监听器。
在我的例子中,我使用 https://github.com/wyukawa/presto-fluentd 来收集查询日志,因为 fluentd 很方便。(易于重试,易于发送
到多个数据存储)
如果你想创建新的事件监听器插件,你也可以参考这个,因为它非常简单。 (或者https://github.com/zz22394/presto-audit也可以用)
接下来,您必须安装事件侦听器插件。
如果您使用 EMR,则可以使用此脚本在 bootstrap actions
上安装 presto-fluentd
# cf. https://github.com/mozilla/emr-bootstrap-presto/blob/master/files/bootstrap/presto-plugins.sh
#!/bin/bash
set -exo pipefail
# re-exec with sudo into background
if [ $(whoami) != root ]; then
sudo "[=10=]" "$@" &
exit 0
fi
# set variables
s3uri=
fluentd_endpoint=
# wait until presto is installed and running
until test -s /var/run/presto/presto-server.pid; do sleep 1; done
# make symbolic link
sudo mkdir -p /usr/lib/presto/etc 2>/dev/null
sudo ln -s /usr/lib/presto/etc /mnt/var/lib/presto/data
# download presto plugins
aws s3 sync $s3uri/jar/ /usr/lib/presto/plugin/
aws s3 sync $s3uri/properties /usr/lib/presto/etc/
# make sure all plugins are owned by presto user
chown -R presto:presto /usr/lib/presto/plugin
chown -R presto:presto /usr/lib/presto/etc
# set event-listner.properties endpoint parameter
echo "event-listener.fluentd-host=$fluentd_endpoint" >>
/usr/lib/presto/etc/event-listener.properties
# restart presto
stop presto-server
start presto-server
事件-listener.properties:
event-listener.name=presto-fluentd
event-listener.fluentd-port=24224
event-listener.fluentd-tag=presto.query
在 s3 目录内:
$ aws s3 ls s3://<s3 bucket>/emr/bootstrap_actions/plugins/jar/presto-fluentd/
2017-10-30 19:12:59 90318 fluency-1.3.0.jar
2017-10-30 19:12:59 2521113 guava-21.0.jar
2017-10-30 19:12:59 55783 jackson-annotations-2.8.1.jar
2017-10-30 19:12:59 252303 jackson-core-2.7.1.jar
2017-10-30 19:12:59 1199160 jackson-databind-2.7.1.jar
2017-10-30 19:12:59 30488 jackson-dataformat-msgpack-0.8.12.jar
2017-10-30 19:12:59 3907 log-0.148.jar
2017-10-30 19:12:59 116125 msgpack-core-0.8.12.jar
2017-10-30 19:12:59 5509 phi-accural-failure-detector-0.0.4.jar
2017-10-30 19:12:59 6130 presto-fluentd-0.0.1.jar
2017-10-30 19:12:59 41077 slf4j-api-1.7.22.jar
$ aws s3 ls s3://<s3 bucket>/emr/bootstrap_actions/plugins/properties/
2017-10-30 19:12:59 109 event-listener.properties
并通过在另一台主机上工作的 fluentd 接收查询日志,如下所示
<match presto.query>
@type copy
<store>
# another data store
</store>
<store>
@type relabel
@label @presto-query-storage
</store>
</match>
# In my case, I use bigquery for storing query log
<label @presto-query-storage>
<match **>
@label @presto-bigquery-out
@type record_reformer
renew_record true
tag presto.query_storage.big_query
<record>
query_id ${record["queryId"]}
user_name ${record["user"]}
elapsed_time ${(record["endTime"] - record["createTime"]) / 1000.0}
start_at
${Time.at(record["executionStartTime"]/1000).utc.strftime("%Y-%m-%d %H:%M:%S.%3N")}
end_at ${Time.at(record["endTime"]/1000).utc.strftime("%Y-%m-%d %H:%M:%S")}
query ${record["query"]}
status ${record["state"]}
</record>
</match>
</label>
提示
我使用这个脚本来收集 presto-fluentd 的依赖项。
require 'fileutils'
require 'open3'
include FileUtils
TMP_PATH = File.expand_path('../../tmp', __FILE__)
JAR_PATH = File.expand_path('../bootstrap_actions/plugins/jar', __FILE__)
CLONE_URI = 'https://github.com/wyukawa/presto-fluentd'
NEEDED_JAR = %w(
fluency-1.3.0.jar
guava-21.0.jar
jackson-annotations-2.8.1.jar
jackson-core-2.7.1.jar
jackson-databind-2.7.1.jar
jackson-dataformat-msgpack-0.8.12.jar
log-0.148.jar
msgpack-core-0.8.12.jar
phi-accural-failure-detector-0.0.4.jar
presto-fluentd-0.0.1.jar
slf4j-api-1.7.22.jar
)
def cleanup_dir
puts "Clean up #{TMP_PATH}/presto-fluentd ..."
rm_r(Dir.glob("#{TMP_PATH}/presto-fluentd"))
mkdir_p("#{JAR_PATH}/presto-fluentd")
puts "Clean up #{JAR_PATH}/presto-fluentd ..."
rm(Dir.glob("#{JAR_PATH}/presto-fluentd/*.jar"))
end
def clone
cd(TMP_PATH)
puts "Download presto-fluentd repo ..."
out, err, status = Open3.capture2("git clone #{CLONE_URI} #{TMP_PATH}/presto-fluentd")
puts out
end
def mvn
cd("#{TMP_PATH}/presto-fluentd")
puts "Build presto-fluentd ..."
out, err, status = Open3.capture2("mvn clean package")
puts out
out, err, status = Open3.capture2("mvn dependency:copy-dependencies -DoutputDirectory=target -DincludeScope=runtime")
puts out
end
def copy_dependencies
cd("#{TMP_PATH}/presto-fluentd/target")
puts "Copy jar files to #{JAR_PATH} ..."
# FIXME: it's better to fix actual pom.xml for assign scope
mv(Dir.glob("*.jar").select{|file| NEEDED_JAR.include?(file)}, "#{JAR_PATH}/presto-fluentd")
puts "done !!"
end
cleanup_dir
clone
mvn
copy_dependencies
如何将所有提交到 presto 集群的查询存储在一个文件(ORC 文件)或其他数据库中。目的是保留对 presto worker 执行的所有查询的记录。
我知道我需要覆盖 queryCompleted 方法,我也尝试按照 this 和那里提到的其他 link 但是我无法使用 maven 创建正确的 jar。放置maven生成的presto jar文件后,我的presto不工作了
我对 presto 和 maven 都是新手。如果有人可以帮助我,那就太好了。
这是我的方法,它适用于 EMR5.9 (presto 0.184)。
首先,如您所知,您可以使用事件监听器。 在我的例子中,我使用 https://github.com/wyukawa/presto-fluentd 来收集查询日志,因为 fluentd 很方便。(易于重试,易于发送 到多个数据存储) 如果你想创建新的事件监听器插件,你也可以参考这个,因为它非常简单。 (或者https://github.com/zz22394/presto-audit也可以用)
接下来,您必须安装事件侦听器插件。 如果您使用 EMR,则可以使用此脚本在 bootstrap actions
上安装presto-fluentd
# cf. https://github.com/mozilla/emr-bootstrap-presto/blob/master/files/bootstrap/presto-plugins.sh
#!/bin/bash
set -exo pipefail
# re-exec with sudo into background
if [ $(whoami) != root ]; then
sudo "[=10=]" "$@" &
exit 0
fi
# set variables
s3uri=
fluentd_endpoint=
# wait until presto is installed and running
until test -s /var/run/presto/presto-server.pid; do sleep 1; done
# make symbolic link
sudo mkdir -p /usr/lib/presto/etc 2>/dev/null
sudo ln -s /usr/lib/presto/etc /mnt/var/lib/presto/data
# download presto plugins
aws s3 sync $s3uri/jar/ /usr/lib/presto/plugin/
aws s3 sync $s3uri/properties /usr/lib/presto/etc/
# make sure all plugins are owned by presto user
chown -R presto:presto /usr/lib/presto/plugin
chown -R presto:presto /usr/lib/presto/etc
# set event-listner.properties endpoint parameter
echo "event-listener.fluentd-host=$fluentd_endpoint" >>
/usr/lib/presto/etc/event-listener.properties
# restart presto
stop presto-server
start presto-server
事件-listener.properties:
event-listener.name=presto-fluentd
event-listener.fluentd-port=24224
event-listener.fluentd-tag=presto.query
在 s3 目录内:
$ aws s3 ls s3://<s3 bucket>/emr/bootstrap_actions/plugins/jar/presto-fluentd/
2017-10-30 19:12:59 90318 fluency-1.3.0.jar
2017-10-30 19:12:59 2521113 guava-21.0.jar
2017-10-30 19:12:59 55783 jackson-annotations-2.8.1.jar
2017-10-30 19:12:59 252303 jackson-core-2.7.1.jar
2017-10-30 19:12:59 1199160 jackson-databind-2.7.1.jar
2017-10-30 19:12:59 30488 jackson-dataformat-msgpack-0.8.12.jar
2017-10-30 19:12:59 3907 log-0.148.jar
2017-10-30 19:12:59 116125 msgpack-core-0.8.12.jar
2017-10-30 19:12:59 5509 phi-accural-failure-detector-0.0.4.jar
2017-10-30 19:12:59 6130 presto-fluentd-0.0.1.jar
2017-10-30 19:12:59 41077 slf4j-api-1.7.22.jar
$ aws s3 ls s3://<s3 bucket>/emr/bootstrap_actions/plugins/properties/
2017-10-30 19:12:59 109 event-listener.properties
并通过在另一台主机上工作的 fluentd 接收查询日志,如下所示
<match presto.query>
@type copy
<store>
# another data store
</store>
<store>
@type relabel
@label @presto-query-storage
</store>
</match>
# In my case, I use bigquery for storing query log
<label @presto-query-storage>
<match **>
@label @presto-bigquery-out
@type record_reformer
renew_record true
tag presto.query_storage.big_query
<record>
query_id ${record["queryId"]}
user_name ${record["user"]}
elapsed_time ${(record["endTime"] - record["createTime"]) / 1000.0}
start_at
${Time.at(record["executionStartTime"]/1000).utc.strftime("%Y-%m-%d %H:%M:%S.%3N")}
end_at ${Time.at(record["endTime"]/1000).utc.strftime("%Y-%m-%d %H:%M:%S")}
query ${record["query"]}
status ${record["state"]}
</record>
</match>
</label>
提示
我使用这个脚本来收集 presto-fluentd 的依赖项。
require 'fileutils'
require 'open3'
include FileUtils
TMP_PATH = File.expand_path('../../tmp', __FILE__)
JAR_PATH = File.expand_path('../bootstrap_actions/plugins/jar', __FILE__)
CLONE_URI = 'https://github.com/wyukawa/presto-fluentd'
NEEDED_JAR = %w(
fluency-1.3.0.jar
guava-21.0.jar
jackson-annotations-2.8.1.jar
jackson-core-2.7.1.jar
jackson-databind-2.7.1.jar
jackson-dataformat-msgpack-0.8.12.jar
log-0.148.jar
msgpack-core-0.8.12.jar
phi-accural-failure-detector-0.0.4.jar
presto-fluentd-0.0.1.jar
slf4j-api-1.7.22.jar
)
def cleanup_dir
puts "Clean up #{TMP_PATH}/presto-fluentd ..."
rm_r(Dir.glob("#{TMP_PATH}/presto-fluentd"))
mkdir_p("#{JAR_PATH}/presto-fluentd")
puts "Clean up #{JAR_PATH}/presto-fluentd ..."
rm(Dir.glob("#{JAR_PATH}/presto-fluentd/*.jar"))
end
def clone
cd(TMP_PATH)
puts "Download presto-fluentd repo ..."
out, err, status = Open3.capture2("git clone #{CLONE_URI} #{TMP_PATH}/presto-fluentd")
puts out
end
def mvn
cd("#{TMP_PATH}/presto-fluentd")
puts "Build presto-fluentd ..."
out, err, status = Open3.capture2("mvn clean package")
puts out
out, err, status = Open3.capture2("mvn dependency:copy-dependencies -DoutputDirectory=target -DincludeScope=runtime")
puts out
end
def copy_dependencies
cd("#{TMP_PATH}/presto-fluentd/target")
puts "Copy jar files to #{JAR_PATH} ..."
# FIXME: it's better to fix actual pom.xml for assign scope
mv(Dir.glob("*.jar").select{|file| NEEDED_JAR.include?(file)}, "#{JAR_PATH}/presto-fluentd")
puts "done !!"
end
cleanup_dir
clone
mvn
copy_dependencies