Pyspark --PY-files ne fonctionne pas
j'ai utiliser ce document suggère http://spark.apache.org/docs/1.1.1/submitting-applications.html
spsark version 1.1.0
./spark/bin/spark-submit --py-files /home/hadoop/loganalysis/parser-src.zip
/home/hadoop/loganalysis/ship-test.py
et conf dans le code :
conf = (SparkConf()
.setMaster("yarn-client")
.setAppName("LogAnalysis")
.set("spark.executor.memory", "1g")
.set("spark.executor.cores", "4")
.set("spark.executor.num", "2")
.set("spark.driver.memory", "4g")
.set("spark.kryoserializer.buffer.mb", "128"))
et nœud esclave se plaindre ImportError
14/12/25 05:09:53 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-172-31-10-8.cn-north-1.compute.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/hadoop/spark/python/pyspark/worker.py", line 75, in main
command = pickleSer._read_with_length(infile)
File "/home/hadoop/spark/python/pyspark/serializers.py", line 150, in _read_with_length
return self.loads(obj)
ImportError: No module named parser
et parser-src.zip est testé localement.
[hadoop@ip-172-31-10-231 ~]$ python
Python 2.7.8 (default, Nov 3 2014, 10:17:30)
[GCC 4.8.2 20140120 (Red Hat 4.8.2-16)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import sys
>>> sys.path.insert(1, '/home/hadoop/loganalysis/parser-src.zip')
>>> from parser import parser
>>> parser.parse
<function parse at 0x7fa5ef4c9848>
>>>
j'essaie d'obtenir des informations sur le travailleur à distance. voir s'il a copié les fichiers.ce que le sys.chemin ressemble..et c'est délicat.
mise à jour: J'ai trouvé que le fichier zip a été envoyé. et sys.le chemin a été tracé. toujours à l'importation d'erreur.
data = list(range(4))
disdata = sc.parallelize(data)
result = disdata.map(lambda x: "sys.path: {0}nDIR: {1} n FILES: {2} n parser: {3}".format(sys.path, os.getcwd(), os.listdir('.'), str(parser)))
result.collect()
print(result.take(4))
il semble que je doive creuser dans cloudpickle.ce qui veut dire que j'ai besoin de comprendre comment cloudpickle fonctionne et échoue en premier.
: An error occurred while calling o40.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 0.0 failed 4 times, most recent failure: Lost task 4.3 in stage 0.0 (TID 23, ip-172-31-10-8.cn-north-1.compute.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/hadoop/spark/python/pyspark/worker.py", line 75, in main
command = pickleSer._read_with_length(infile)
File "/home/hadoop/spark/python/pyspark/serializers.py", line 150, in _read_with_length
return self.loads(obj)
File "/home/hadoop/spark/python/pyspark/cloudpickle.py", line 811, in subimport
__import__(name)
ImportError: ('No module named parser', <function subimport at 0x7f219ffad7d0>, ('parser.parser',))
mise à jour:
quelqu'un rencontre le même problème dans spark 0.8 http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-Importing-other-py-files-in-PYTHONPATH-td2301.html
mais il a mis sa lib dans Python dist-packages et importer des travaux. que j'ai essayé et toujours obtenir de l'erreur d'importation.
mise à jour:
OH.jaillir.. Je pense que le problème est causé par le fait de ne pas comprendre le fichier zip et le comportement d'importation de python..Je passe parser.py --py-files, il travaille, se plaignent d'une autre dépendance. et zip seulement le .py fichiers [non compris .pyc] semble fonctionner aussi.
Mais je ne comprenais pas pourquoi.
7 réponses
essayez d'importer votre module personnalisé de l'intérieur de la méthode elle-même plutôt qu'en haut du script du pilote, par exemple:
def parse_record(record):
import parser
p = parser.parse(record)
return p
plutôt que
import parser
def parse_record(record):
p = parser.parse(record)
return p
Cloud Pickle ne semble pas reconnaître quand un module personnalisé a été importé, il semble donc essayer de sélectionner les modules de haut niveau avec les autres données qui sont nécessaires pour exécuter la méthode. D'après mon expérience, cela signifie que des modules de haut niveau semblent exister, mais ils manquent de membres utilisables et de modules imbriqués. ne peut pas être utilisé comme prévu. Une fois, soit importer avec from A import *
ou de l'intérieur de la méthode (import A.B
), les modules ont fonctionné comme prévu.
Essayez cette fonction de SparkContext
sc.addPyFile(path)
Selon pyspark
documentation ici
Ajouter un .py ou .dépendance zip Pour toutes les tâches à exécuter sur ce SparkContext dans le futur. Le chemin passé peut être soit un fichier local, soit un fichier dans HDFS (ou d'autres systèmes de fichiers pris en charge par Hadoop), soit un URI HTTP, HTTPS ou FTP.
essayez de télécharger le fichier de votre module python dans un espace de stockage cloud public (par exemple AWS S3) et passez L'URL à cette méthode.
voici un matériel de lecture plus complet: http://www.cloudera.com/documentation/enterprise/5-5-x/topics/spark_python.html
on dirait qu'un ou plusieurs noeuds ne sont pas configurés correctement. Est-ce que tous les noeuds du cluster ont la même version/configuration de Python (c.-à-d. qu'ils ont tous le module parser installé)?
Si vous n'avez pas envie de vérifier un par un, vous pouvez écrire un script pour vérifier si il est installé/l'installer pour vous. fil de discussion montre quelques façons de le faire.
vous devez empaqueter votre code Python en utilisant des outils comme setuptools. Cela vous permettra de créer un .fichier egg qui est similaire au fichier java jar. Vous pouvez alors spécifier le chemin de ce fichier egg en utilisant --py-files
spark-soumettre --py-fichiers path_to_egg_file path_to_spark_driver_file
j'étais confronté à un problème similaire, mes noeuds de travail ne pouvaient pas détecter les modules même si j'utilisais le --py-files
switch.
il y avait quelques choses que j'ai faites - D'abord j'ai essayé de mettre la déclaration d'importation après que j'ai créé la variable SparkContext (sc) espérant que l'importation devrait avoir lieu après que le module ait expédié à tous les noeuds mais toujours il n'a pas fonctionné. J'ai ensuite essayé sc.addFile
pour ajouter le module dans le script lui-même (au lieu de l'envoyer comme argument en ligne de commande)) et ensuite importé les fonctions du module. Cela a fait l'affaire au moins dans mon cas.
PySpark sur EMR est configuré pour Python 2.6 par défaut, donc assurez-vous qu'ils ne sont pas installés pour l'interpréteur Python 2.7
créer des fichiers zip (exemple - abc.zip) contenant toutes vos dépendances.
lors de la création du contexte spark, mentionnez le nom du fichier zip comme suit:
sc = SparkContext(conf=conf, pyFiles=["abc.zip"])