Comment exécuter Spark code dans Airflow?

Bonjour peuple de la Terre! J'utilise Airflow pour programmer et exécuter des tâches D'étincelles. Tout ce que j'ai trouvé à ce moment-là, c'est des DAGs en python que Airflow peut gérer.

DAG exemple:

spark_count_lines.py
import logging

from airflow import DAG
from airflow.operators import PythonOperator

from datetime import datetime

args = {
  'owner': 'airflow'
  , 'start_date': datetime(2016, 4, 17)
  , 'provide_context': True
}

dag = DAG(
  'spark_count_lines'
  , start_date = datetime(2016, 4, 17)
  , schedule_interval = '@hourly'
  , default_args = args
)

def run_spark(**kwargs):
  import pyspark
  sc = pyspark.SparkContext()
  df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt')
  logging.info('Number of lines in people.txt = {0}'.format(df.count()))
  sc.stop()

t_main = PythonOperator(
  task_id = 'call_spark'
  , dag = dag
  , python_callable = run_spark
)

le problème est que je ne suis pas bon en code Python et que j'ai des tâches écrites en Java. Ma question Est comment exécuter Spark Java jar en DAG python? Ou peut-être qu'il y a une autre façon de le faire? J'ai trouvé l'étincelle soumettre: http://spark.apache.org/docs/latest/submitting-applications.html

mais je ne sais pas comment tout relier. Peut-être que quelqu'un l'a déjà utilisé et a un exemple qui marche. Je vous remercie pour votre temps!

17
demandé sur user6910411 2016-10-03 11:56:09

3 réponses

vous devriez pouvoir utiliser BashOperator. Conserver le reste de votre code tel quel, importer les paquets de classe et de système requis:

from airflow.operators.bash_operator import BashOperator

import os
import sys

requis chemins:

os.environ['SPARK_HOME'] = '/path/to/spark/root'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))

et ajouter de l'opérateur:

spark_task = BashOperator(
    task_id='spark_java',
    bash_command='spark-submit --class {{ params.class }} {{ params.jar }}',
    params={'class': 'MainClassName', 'jar': '/path/to/your.jar'},
    dag=dag
)

vous pouvez facilement étendre cela pour fournir des arguments supplémentaires en utilisant Jinja templates.

vous pouvez bien sûr ajuster cela pour le scénario sans étincelle en remplaçant bash_command avec un gabarit adapté à votre cas, pour exemple:

bash_command = 'java -jar {{ params.jar }}'

et en ajustant params.

12
répondu user6910411 2016-12-28 01:32:59

débit d'Air à partir de la version 1.8 (sorti aujourd'hui), a

code SparkSQLHook - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py

code SparkSubmitHook - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py

notez que ces deux nouveaux opérateurs/crochets D'étincelles sont dans la branche "contrib" à partir de la version 1.8 donc pas (bien) documentés.

vous pouvez donc utiliser SparkSubmitOperator pour soumettre votre code java pour l'Étincelle de l'exécution.

12
répondu Tagar 2017-03-20 17:06:01

Il y a un exemple de SparkSubmitOperator utilisation de Spark 2.3.1 kubernetes (minikube exemple):

Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.models import Variable
from datetime import datetime, timedelta

default_args = {
    'owner': 'user@mail.com',
    'depends_on_past': False,
    'start_date': datetime(2018, 7, 27),
    'email': ['user@mail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    'end_date': datetime(2018, 7, 29),
}

dag = DAG(
    'tutorial_spark_operator', default_args=default_args, schedule_interval=timedelta(1))

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

print_path_env_task = BashOperator(
    task_id='print_path_env',
    bash_command='echo $PATH',
    dag=dag)

spark_submit_task = SparkSubmitOperator(
    task_id='spark_submit_job',
    conn_id='spark_default',
    java_class='com.ibm.cdopoc.DataLoaderDB2COS',
    application='local:///opt/spark/examples/jars/cppmpoc-dl-0.1.jar',
    total_executor_cores='1',
    executor_cores='1',
    executor_memory='2g',
    num_executors='2',
    name='airflowspark-DataLoaderDB2COS',
    verbose=True,
    driver_memory='1g',
    conf={
        'spark.DB_URL': 'jdbc:db2://dashdb-dal13.services.dal.bluemix.net:50001/BLUDB:sslConnection=true;',
        'spark.DB_USER': Variable.get("CEDP_DB2_WoC_User"),
        'spark.DB_PASSWORD': Variable.get("CEDP_DB2_WoC_Password"),
        'spark.DB_DRIVER': 'com.ibm.db2.jcc.DB2Driver',
        'spark.DB_TABLE': 'MKT_ATBTN.MERGE_STREAM_2000_REST_API',
        'spark.COS_API_KEY': Variable.get("COS_API_KEY"),
        'spark.COS_SERVICE_ID': Variable.get("COS_SERVICE_ID"),
        'spark.COS_ENDPOINT': 's3-api.us-geo.objectstorage.softlayer.net',
        'spark.COS_BUCKET': 'data-ingestion-poc',
        'spark.COS_OUTPUT_FILENAME': 'cedp-dummy-table-cos2',
        'spark.kubernetes.container.image': 'ctipka/spark:spark-docker',
        'spark.kubernetes.authenticate.driver.serviceAccountName': 'spark'
        },
    dag=dag,
)

t1.set_upstream(print_path_env_task)
spark_submit_task.set_upstream(t1)

le code utilisant les variables stockées dans les variables de flux D'Air: enter image description here

en outre, vous devez créer une nouvelle connexion spark ou éditer 'spark_default' existant avec dictionnaire supplémentaire {"queue":"root.default", "deploy-mode":"cluster", "spark-home":"", "spark-binary":"spark-submit", "namespace":"default"}: enter image description here

1
répondu CTiPKA 2018-07-27 12:24:14