Apache Airflow SparkSQLOperator 不断打印空日志
Apache Airflow SparkSQLOperator keeps printing empty logs
我写了一个很简单的airflow dag如下:
import airflow
from airflow import DAG from airflow.contrib.operators.spark_sql_operator import SparkSqlOperator
from datetime import timedelta from datetime import datetime as dt
default_args = {
'owner': 'zxy', 'depends_on_past': False,
'email_on_failure': True, 'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'my_first_dag',
default_args=default_args,
#start_date=dt.strptime('2018-05-16', '%Y-%m-%d'),
start_date=airflow.utils.dates.days_ago(2),
description='My First Airflow DAG',
schedule_interval=timedelta(minutes=5))
sql = r'''select count(u) from some_table where time=20180513 and platform='iOS' '''
t1 = SparkSqlOperator(task_id='Count_Ads_U', conn_id='spark_default',sql=sql, dag=dag)
然后我运行airflow scheduler
安排作业。
作业成功给出正确的数字,但作业一直打印空白日志如下,因此无法成功停止:
[2018-05-16 06:33:07,505] {base_task_runner.py:98} INFO - Subtask:
[2018-05-16 06:33:07,505] {spark_sql_hook.py:142} INFO - b'18/05/16
06:33:07 INFO spark. SparkContext: Successfully stopped
SparkContext\n'
[2018-05-16 06:33:07,506] {base_task_runner.py:98} INFO - Subtask:
[2018-05-16 06:33:07,506] {spark_sql_hook.py:142} INFO - b'18/05/16
06:33:07 INFO util. ShutdownHookManager: Shutdown hook called\n'
[2018-05-16 06:33:07,506] {base_task_runner.py:98} INFO - Subtask:
[2018-05-16 06:33:07,506] {spark_sql_hook.py:142} INFO - b'18/05/16
06:33:07 INFO util. ShutdownHookManager: Deleting directory
/tmp/spark-fbb4089c-338b-4b0e-a394-975f45b307a8\n'
[2018-05-16 06:33:07,509] {base_task_runner.py:98} INFO - Subtask:
[2018-05-16 06:33:07,509] {spark_sql_hook.py:142} INFO - b'18/05/16
06:33:07 INFO util. ShutdownHookManager: Deleting directory
/apps/data/spark/temp/spark-f6b6695f-24e4-4db0-ae2b-29b6836ab9c3\n'
[2018-05-16 06:33:07,902] {base_task_runner.py:98} INFO - Subtask:
[2018-05-16 06:33:07,902] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask:
[2018-05-16 06:33:07,902] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask:
[2018-05-16 06:33:07,902] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask:
[2018-05-16 06:33:07,902] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask:
[2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask:
[2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask:
[2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask:
[2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask:
[2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask:
[2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask:
[2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask:
[2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask:
[2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,904] {base_task_runner.py:98} INFO - Subtask:
[2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,904] {base_task_runner.py:98} INFO - Subtask:
[2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,904] {base_task_runner.py:98} INFO - Subtask:
[2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
空日志无休止地继续下去,直到我按 Ctr+C 停止调度程序。
airflow 版本为 v1.9.0。
问题已解决。
如果您使用 Python 3.x(https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py 的第 146 行):
,这是由 byte literal vs string literal
问题引起的
for line in iter(self._sp.stdout.readline, ''):
self.log.info(line)
iter中使用的sentinel是'',也就是空的string literal
。但是stdout中的实际内容是byte literals
而不是string literals
(参考这个post:What does the 'b' character do in front of a string literal?),从b
前缀可以看出每行日志,因此 for 循环由于某种原因永远不会结束。
我通过将 ''
替换为 b''
来解决问题。
我写了一个很简单的airflow dag如下:
import airflow
from airflow import DAG from airflow.contrib.operators.spark_sql_operator import SparkSqlOperator
from datetime import timedelta from datetime import datetime as dt
default_args = {
'owner': 'zxy', 'depends_on_past': False,
'email_on_failure': True, 'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'my_first_dag',
default_args=default_args,
#start_date=dt.strptime('2018-05-16', '%Y-%m-%d'),
start_date=airflow.utils.dates.days_ago(2),
description='My First Airflow DAG',
schedule_interval=timedelta(minutes=5))
sql = r'''select count(u) from some_table where time=20180513 and platform='iOS' '''
t1 = SparkSqlOperator(task_id='Count_Ads_U', conn_id='spark_default',sql=sql, dag=dag)
然后我运行airflow scheduler
安排作业。
作业成功给出正确的数字,但作业一直打印空白日志如下,因此无法成功停止:
[2018-05-16 06:33:07,505] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,505] {spark_sql_hook.py:142} INFO - b'18/05/16 06:33:07 INFO spark. SparkContext: Successfully stopped SparkContext\n'
[2018-05-16 06:33:07,506] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,506] {spark_sql_hook.py:142} INFO - b'18/05/16 06:33:07 INFO util. ShutdownHookManager: Shutdown hook called\n'
[2018-05-16 06:33:07,506] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,506] {spark_sql_hook.py:142} INFO - b'18/05/16 06:33:07 INFO util. ShutdownHookManager: Deleting directory /tmp/spark-fbb4089c-338b-4b0e-a394-975f45b307a8\n'
[2018-05-16 06:33:07,509] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,509] {spark_sql_hook.py:142} INFO - b'18/05/16 06:33:07 INFO util. ShutdownHookManager: Deleting directory /apps/data/spark/temp/spark-f6b6695f-24e4-4db0-ae2b-29b6836ab9c3\n'
[2018-05-16 06:33:07,902] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,902] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,902] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,902] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,902] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,904] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,904] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,904] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
空日志无休止地继续下去,直到我按 Ctr+C 停止调度程序。
airflow 版本为 v1.9.0。
问题已解决。
如果您使用 Python 3.x(https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py 的第 146 行):
,这是由byte literal vs string literal
问题引起的
for line in iter(self._sp.stdout.readline, ''):
self.log.info(line)
iter中使用的sentinel是'',也就是空的string literal
。但是stdout中的实际内容是byte literals
而不是string literals
(参考这个post:What does the 'b' character do in front of a string literal?),从b
前缀可以看出每行日志,因此 for 循环由于某种原因永远不会结束。
我通过将 ''
替换为 b''
来解决问题。