Comment puis-je tester les programmes PySpark à l'unité?
ma méthode actuelle de test de L'Unité Java/Spark fonctionne (détaillé ici) en instanciantun SparkContext à l'aide de "local" et en exécutant des tests unitaires à L'aide de JUnit.
le code doit être organisé pour effectuer des entrées/sorties dans une fonction puis appeler une autre avec plusieurs RDDs.
Cela fonctionne très bien. J'ai une transformation de données très testée écrite en Java + Spark.
puis-je faire la même chose avec Python?
Comment pourrais-je faire des tests avec Python?
5 réponses
je recommande l'utilisation de py.test ainsi. py.test permet de créer facilement des fixtures de test SparkContext réutilisables et de les utiliser pour écrire des fonctions de test concises. Vous pouvez également vous spécialiser dans les fixtures (pour créer un StreamingContext par exemple) et en utiliser une ou plusieurs dans vos tests.
j'ai écrit un billet de blog sur Medium à ce sujet:
https://engblog.nextdoor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b
Ici est un extrait de l' post:
pytestmark = pytest.mark.usefixtures("spark_context")
def test_do_word_counts(spark_context):
""" test word couting
Args:
spark_context: test fixture SparkContext
"""
test_input = [
' hello spark ',
' hello again spark spark'
]
input_rdd = spark_context.parallelize(test_input, 1)
results = wordcount.do_word_counts(input_rdd)
expected_results = {'hello':2, 'spark':3, 'again':1}
assert results == expected_results
j'utilise pytest
, qui autorise les fixtures de test pour que vous puissiez instancier un contexte pyspark et l'injecter dans tous vos tests qui l'exigent. Quelque chose le long des lignes de
@pytest.fixture(scope="session",
params=[pytest.mark.spark_local('local'),
pytest.mark.spark_yarn('yarn')])
def spark_context(request):
if request.param == 'local':
conf = (SparkConf()
.setMaster("local[2]")
.setAppName("pytest-pyspark-local-testing")
)
elif request.param == 'yarn':
conf = (SparkConf()
.setMaster("yarn-client")
.setAppName("pytest-pyspark-yarn-testing")
.set("spark.executor.memory", "1g")
.set("spark.executor.instances", 2)
)
request.addfinalizer(lambda: sc.stop())
sc = SparkContext(conf=conf)
return sc
def my_test_that_requires_sc(spark_context):
assert spark_context.textFile('/path/to/a/file').count() == 10
alors vous pouvez exécuter les tests en mode local en appelant py.test -m spark_local
ou en LAINE avec py.test -m spark_yarn
. Cela a très bien fonctionné pour moi.
Voici une solution avec pytest si vous utilisez Spark 2.x et SparkSession
. J'importe aussi un paquet de tierce partie.
import logging
import pytest
from pyspark.sql import SparkSession
def quiet_py4j():
"""Suppress spark logging for the test context."""
logger = logging.getLogger('py4j')
logger.setLevel(logging.WARN)
@pytest.fixture(scope="session")
def spark_session(request):
"""Fixture for creating a spark context."""
spark = (SparkSession
.builder
.master('local[2]')
.config('spark.jars.packages', 'com.databricks:spark-avro_2.11:3.0.1')
.appName('pytest-pyspark-local-testing')
.enableHiveSupport()
.getOrCreate())
request.addfinalizer(lambda: spark.stop())
quiet_py4j()
return spark
def test_my_app(spark_session):
...
Note si J'utilise Python 3, je dois spécifier que comme une variable d'environnement PYSPARK_PYTHON:
import os
import sys
IS_PY2 = sys.version_info < (3,)
if not IS_PY2:
os.environ['PYSPARK_PYTHON'] = 'python3'
sinon vous obtenez l'erreur:
Exception: Python dans worker a une version 2.7 différente de celle dans pilote 3.5, PySpark ne peut pas fonctionner avec différentes versions mineures.Merci vérifier les variables d'environnement PYSPARK_PYTHON et PYSPARK_DRIVER_PYTHON sont correctement définies.
en Supposant que vous avez pyspark
installé, vous pouvez utiliser la classe ci-dessous pour unitTest unittest
:
import unittest
import pyspark
class PySparkTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
conf = pyspark.SparkConf().setMaster("local[2]").setAppName("testing")
cls.sc = pyspark.SparkContext(conf=conf)
@classmethod
def tearDownClass(cls):
cls.sc.stop()
Exemple:
class SimpleTestCase(PySparkTestCase):
def test_basic(self):
test_input = [
' hello spark ',
' hello again spark spark'
]
input_rdd = self.sc.parallelize(test_input, 1)
from operator import add
results = input_rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(add).collect()
self.assertEqual(results, [('hello', 2), ('spark', 3), ('again', 1)])
il y a quelque temps, j'ai également fait face au même problème et après avoir lu plusieurs articles, forums et quelques réponses StackOverflow, j'ai fini par écrire un petit plugin pour pytest: pytest-spark
Je l'utilise déjà depuis quelques mois et le workflow général semble bon sous Linux:
- Installer Apache Spark (configuration de la JVM + décompresser Étincelle de distribution dans un répertoire)
- installer "pytest" + plugin "pytest-étincelle"
- créer " pytest.ini" dans votre répertoire de projet et spécifier Étincelle emplacement.
- exécutez vos tests par pytest comme d'habitude.
- optionnellement, vous pouvez utiliser fixture "spark_context" dans vos tests qui est fourni par plugin - il essaie de minimiser les logs de Spark dans la sortie.