Comment exécuter Spark code dans Airflow?
Bonjour peuple de la Terre!
J'utilise Airflow pour programmer et exécuter des tâches D'étincelles.
Tout ce que j'ai trouvé à ce moment-là, c'est des DAGs en python que Airflow peut gérer.
DAG exemple:
spark_count_lines.py
import logging
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime
args = {
'owner': 'airflow'
, 'start_date': datetime(2016, 4, 17)
, 'provide_context': True
}
dag = DAG(
'spark_count_lines'
, start_date = datetime(2016, 4, 17)
, schedule_interval = '@hourly'
, default_args = args
)
def run_spark(**kwargs):
import pyspark
sc = pyspark.SparkContext()
df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt')
logging.info('Number of lines in people.txt = {0}'.format(df.count()))
sc.stop()
t_main = PythonOperator(
task_id = 'call_spark'
, dag = dag
, python_callable = run_spark
)
le problème est que je ne suis pas bon en code Python et que j'ai des tâches écrites en Java. Ma question Est comment exécuter Spark Java jar en DAG python? Ou peut-être qu'il y a une autre façon de le faire? J'ai trouvé l'étincelle soumettre: http://spark.apache.org/docs/latest/submitting-applications.html
mais je ne sais pas comment tout relier. Peut-être que quelqu'un l'a déjà utilisé et a un exemple qui marche. Je vous remercie pour votre temps!
3 réponses
vous devriez pouvoir utiliser BashOperator
. Conserver le reste de votre code tel quel, importer les paquets de classe et de système requis:
from airflow.operators.bash_operator import BashOperator
import os
import sys
requis chemins:
os.environ['SPARK_HOME'] = '/path/to/spark/root'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))
et ajouter de l'opérateur:
spark_task = BashOperator(
task_id='spark_java',
bash_command='spark-submit --class {{ params.class }} {{ params.jar }}',
params={'class': 'MainClassName', 'jar': '/path/to/your.jar'},
dag=dag
)
vous pouvez facilement étendre cela pour fournir des arguments supplémentaires en utilisant Jinja templates.
vous pouvez bien sûr ajuster cela pour le scénario sans étincelle en remplaçant bash_command
avec un gabarit adapté à votre cas, pour exemple:
bash_command = 'java -jar {{ params.jar }}'
et en ajustant params
.
débit d'Air à partir de la version 1.8 (sorti aujourd'hui), a
- Operator Sparksql - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/spark_sql_operator.py ;
code SparkSQLHook - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py
- SparkSubmitOperator - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/spark_submit_operator.py
code SparkSubmitHook - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py
notez que ces deux nouveaux opérateurs/crochets D'étincelles sont dans la branche "contrib" à partir de la version 1.8 donc pas (bien) documentés.
vous pouvez donc utiliser SparkSubmitOperator pour soumettre votre code java pour l'Étincelle de l'exécution.
Il y a un exemple de SparkSubmitOperator
utilisation de Spark 2.3.1 kubernetes (minikube exemple):
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.models import Variable
from datetime import datetime, timedelta
default_args = {
'owner': 'user@mail.com',
'depends_on_past': False,
'start_date': datetime(2018, 7, 27),
'email': ['user@mail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
'end_date': datetime(2018, 7, 29),
}
dag = DAG(
'tutorial_spark_operator', default_args=default_args, schedule_interval=timedelta(1))
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
print_path_env_task = BashOperator(
task_id='print_path_env',
bash_command='echo $PATH',
dag=dag)
spark_submit_task = SparkSubmitOperator(
task_id='spark_submit_job',
conn_id='spark_default',
java_class='com.ibm.cdopoc.DataLoaderDB2COS',
application='local:///opt/spark/examples/jars/cppmpoc-dl-0.1.jar',
total_executor_cores='1',
executor_cores='1',
executor_memory='2g',
num_executors='2',
name='airflowspark-DataLoaderDB2COS',
verbose=True,
driver_memory='1g',
conf={
'spark.DB_URL': 'jdbc:db2://dashdb-dal13.services.dal.bluemix.net:50001/BLUDB:sslConnection=true;',
'spark.DB_USER': Variable.get("CEDP_DB2_WoC_User"),
'spark.DB_PASSWORD': Variable.get("CEDP_DB2_WoC_Password"),
'spark.DB_DRIVER': 'com.ibm.db2.jcc.DB2Driver',
'spark.DB_TABLE': 'MKT_ATBTN.MERGE_STREAM_2000_REST_API',
'spark.COS_API_KEY': Variable.get("COS_API_KEY"),
'spark.COS_SERVICE_ID': Variable.get("COS_SERVICE_ID"),
'spark.COS_ENDPOINT': 's3-api.us-geo.objectstorage.softlayer.net',
'spark.COS_BUCKET': 'data-ingestion-poc',
'spark.COS_OUTPUT_FILENAME': 'cedp-dummy-table-cos2',
'spark.kubernetes.container.image': 'ctipka/spark:spark-docker',
'spark.kubernetes.authenticate.driver.serviceAccountName': 'spark'
},
dag=dag,
)
t1.set_upstream(print_path_env_task)
spark_submit_task.set_upstream(t1)
le code utilisant les variables stockées dans les variables de flux D'Air:
en outre, vous devez créer une nouvelle connexion spark ou éditer 'spark_default' existant avec
dictionnaire supplémentaire {"queue":"root.default", "deploy-mode":"cluster", "spark-home":"", "spark-binary":"spark-submit", "namespace":"default"}
: