Comment puis-je tester les programmes PySpark à l'unité?

ma méthode actuelle de test de L'Unité Java/Spark fonctionne (détaillé ici) en instanciantun SparkContext à l'aide de "local" et en exécutant des tests unitaires à L'aide de JUnit.

le code doit être organisé pour effectuer des entrées/sorties dans une fonction puis appeler une autre avec plusieurs RDDs.

Cela fonctionne très bien. J'ai une transformation de données très testée écrite en Java + Spark.

puis-je faire la même chose avec Python?

Comment pourrais-je faire des tests avec Python?

23
demandé sur Community 2015-11-19 21:40:33

5 réponses

je recommande l'utilisation de py.test ainsi. py.test permet de créer facilement des fixtures de test SparkContext réutilisables et de les utiliser pour écrire des fonctions de test concises. Vous pouvez également vous spécialiser dans les fixtures (pour créer un StreamingContext par exemple) et en utiliser une ou plusieurs dans vos tests.

j'ai écrit un billet de blog sur Medium à ce sujet:

https://engblog.nextdoor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b

Ici est un extrait de l' post:

pytestmark = pytest.mark.usefixtures("spark_context")
def test_do_word_counts(spark_context):
    """ test word couting
    Args:
       spark_context: test fixture SparkContext
    """
    test_input = [
        ' hello spark ',
        ' hello again spark spark'
    ]

    input_rdd = spark_context.parallelize(test_input, 1)
    results = wordcount.do_word_counts(input_rdd)

    expected_results = {'hello':2, 'spark':3, 'again':1}  
    assert results == expected_results
21
répondu Vikas Kawadia 2016-05-01 17:57:53

j'utilise pytest, qui autorise les fixtures de test pour que vous puissiez instancier un contexte pyspark et l'injecter dans tous vos tests qui l'exigent. Quelque chose le long des lignes de

@pytest.fixture(scope="session",
                params=[pytest.mark.spark_local('local'),
                        pytest.mark.spark_yarn('yarn')])
def spark_context(request):
    if request.param == 'local':
        conf = (SparkConf()
                .setMaster("local[2]")
                .setAppName("pytest-pyspark-local-testing")
                )
    elif request.param == 'yarn':
        conf = (SparkConf()
                .setMaster("yarn-client")
                .setAppName("pytest-pyspark-yarn-testing")
                .set("spark.executor.memory", "1g")
                .set("spark.executor.instances", 2)
                )
    request.addfinalizer(lambda: sc.stop())

    sc = SparkContext(conf=conf)
    return sc

def my_test_that_requires_sc(spark_context):
    assert spark_context.textFile('/path/to/a/file').count() == 10

alors vous pouvez exécuter les tests en mode local en appelant py.test -m spark_local ou en LAINE avec py.test -m spark_yarn. Cela a très bien fonctionné pour moi.

8
répondu santon 2015-11-19 23:44:11

Voici une solution avec pytest si vous utilisez Spark 2.x et SparkSession. J'importe aussi un paquet de tierce partie.

import logging

import pytest
from pyspark.sql import SparkSession

def quiet_py4j():
    """Suppress spark logging for the test context."""
    logger = logging.getLogger('py4j')
    logger.setLevel(logging.WARN)


@pytest.fixture(scope="session")
def spark_session(request):
    """Fixture for creating a spark context."""

    spark = (SparkSession
             .builder
             .master('local[2]')
             .config('spark.jars.packages', 'com.databricks:spark-avro_2.11:3.0.1')
             .appName('pytest-pyspark-local-testing')
             .enableHiveSupport()
             .getOrCreate())
    request.addfinalizer(lambda: spark.stop())

    quiet_py4j()
    return spark


def test_my_app(spark_session):
   ...

Note si J'utilise Python 3, je dois spécifier que comme une variable d'environnement PYSPARK_PYTHON:

import os
import sys

IS_PY2 = sys.version_info < (3,)

if not IS_PY2:
    os.environ['PYSPARK_PYTHON'] = 'python3'

sinon vous obtenez l'erreur:

Exception: Python dans worker a une version 2.7 différente de celle dans pilote 3.5, PySpark ne peut pas fonctionner avec différentes versions mineures.Merci vérifier les variables d'environnement PYSPARK_PYTHON et PYSPARK_DRIVER_PYTHON sont correctement définies.

8
répondu ksindi 2017-01-12 17:08:49

en Supposant que vous avez pyspark installé, vous pouvez utiliser la classe ci-dessous pour unitTest unittest:

import unittest
import pyspark


class PySparkTestCase(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        conf = pyspark.SparkConf().setMaster("local[2]").setAppName("testing")
        cls.sc = pyspark.SparkContext(conf=conf)

    @classmethod
    def tearDownClass(cls):
        cls.sc.stop()

Exemple:

class SimpleTestCase(PySparkTestCase):

    def test_basic(self):
        test_input = [
            ' hello spark ',
            ' hello again spark spark'
        ]

        input_rdd = self.sc.parallelize(test_input, 1)

        from operator import add

        results = input_rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(add).collect()
        self.assertEqual(results, [('hello', 2), ('spark', 3), ('again', 1)])
4
répondu Jorge Leitão 2017-11-10 12:58:01

il y a quelque temps, j'ai également fait face au même problème et après avoir lu plusieurs articles, forums et quelques réponses StackOverflow, j'ai fini par écrire un petit plugin pour pytest: pytest-spark

Je l'utilise déjà depuis quelques mois et le workflow général semble bon sous Linux:

  1. Installer Apache Spark (configuration de la JVM + décompresser Étincelle de distribution dans un répertoire)
  2. installer "pytest" + plugin "pytest-étincelle"
  3. créer " pytest.ini" dans votre répertoire de projet et spécifier Étincelle emplacement.
  4. exécutez vos tests par pytest comme d'habitude.
  5. optionnellement, vous pouvez utiliser fixture "spark_context" dans vos tests qui est fourni par plugin - il essaie de minimiser les logs de Spark dans la sortie.
1
répondu Alex Markov 2017-09-01 13:05:44