Comment lire les entrées de S3 dans une application cluster Spark Streaming EC2
j'essaie de faire lire à mon application Spark Streaming ses entrées d'un répertoire S3, mais je continue de recevoir cette exception après l'avoir lancée avec spark-submit script:
Exception in thread "main" java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).
at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:66)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
at org.apache.hadoop.fs.s3native.$Proxy6.initialize(Unknown Source)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:216)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:66)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
at org.apache.spark.streaming.StreamingContext.checkpoint(StreamingContext.scala:195)
at MainClass$.main(MainClass.scala:1190)
at MainClass.main(MainClass.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
je place ces variables à travers ce bloc de code comme suggéré ici http://spark.apache.org/docs/latest/ec2-scripts.html (bas de la page):
val ssc = new org.apache.spark.streaming.StreamingContext(
conf,
Seconds(60))
ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId",args(2))
ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey",args(3))
args(2) et args(3) sont mes AWS ID de Clé d'Accès et de Sécréter La Clé d'accès de cours.
pourquoi il continue de dire qu'ils ne sont pas fixés?
EDIT: j'ai essayé aussi de cette façon mais j'obtiens la même exception:
val lines = ssc.textFileStream("s3n://"+ args(2) +":"+ args(3) + "@<mybucket>/path/")
9 réponses
Impair. Essayez aussi de faire un .set
sur le sparkContext
. Essayez aussi d'exporter des variables env avant de commencer l'application:
export AWS_ACCESS_KEY_ID=<your access>
export AWS_SECRET_ACCESS_KEY=<your secret>
^^c'est ainsi que nous le faisons.
mise à jour: selon @tribbloid ce qui précède a cassé en 1.3.0, maintenant vous devez faff autour des âges et des âges avec HDFS-site.xml, ou vous pouvez le faire (et cela fonctionne dans une étincelle-shell):
val hadoopConf = sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", myAccessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey", mySecretKey)
la configuration suivante fonctionne pour moi, assurez-vous que vous mettez aussi" fs.s3.impl":
val conf = new SparkConf().setAppName("Simple Application").setMaster("local")
val sc = new SparkContext(conf)
val hadoopConf=sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId",myAccessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey",mySecretKey)
Sur AWS DME les suggestions ci-dessus ne fonctionne pas. Au lieu de cela, j'ai mis à jour les propriétés suivantes dans le site de conf/core.xml:
fs.s3n.awsAccessKeyId et fs.s3n.awsSecretAccessKey avec vos justificatifs D'identité S3.
pour ceux qui utilisent EMR, utilisez la construction à étincelles comme décrit à https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark et il suffit de faire référence à S3 avec L'URI S3://. Il n'est pas nécessaire de définir la mise en œuvre de S3 ou une configuration supplémentaire car les justificatifs d'identité sont définis par IAM ou le rôle.
je voulais mettre les informations d'identification plus en sécurité dans un fichier de configuration sur une de mes partitions cryptées. Donc j'ai fait export HADOOP_CONF_DIR=~/Private/.aws/hadoop_conf
avant d'exécuter mon application spark, et j'ai mis un fichier dans ce répertoire (crypté via ecryptfs
) appelé core-site.xml
contenant les justificatifs comme ceci:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.s3n.awsAccessKeyId</name>
<value>my_aws_access_key_id_here</value>
</property>
<property>
<name>fs.s3n.awsSecretAccessKey</name>
<value>my_aws_secret_access_key_here</value>
</property>
</configuration>
HADOOP_CONF_DIR
peut aussi être défini dans conf/spark-env.sh
.
les dernières versions EMR (testées le 4.6.0) nécessitent la configuration suivante:
val sc = new SparkContext(conf)
val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "com.amazon.ws.emr.hadoop.fs.EmrFileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", myAccessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey", mySecretKey)
bien que dans la plupart des cas hors de la configuration devrait fonctionner - c'est si vous avez des références S3 différentes de celles que vous avez lancé le cluster avec.
cela fonctionne pour moi dans 1.4.1 shell:
val conf = sc.getConf
conf.set("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
conf.set("spark.hadoop.fs.s3.awsAccessKeyId", <your access key>)
conf.set("spark.hadoop.fs.s3.awsSecretAccessKey", <your secret key>)
SparkHadoopUtil.get.conf.addResource(SparkHadoopUtil.get.newConfiguration(conf))
...
sqlContext.read.parquet("s3://...")
pour augmenter la réponse de @nealmcb, la façon la plus simple de le faire est de définir
HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
dans conf/spark-env.sh
ou exporter cette variable env dans ~/.bashrc
ou ~/.bash_profile
.
qui fonctionne aussi longtemps que vous pouvez accéder à s3 par hadoop. Par exemple, si vous pouvez exécuter
hadoop fs -ls s3n://path/
alors hadoop peut voir le chemin s3.
si hadoop ne voit pas le chemin, suivez les conseils de contenu dans Comment puis-je accéder à S3/S3n à partir d'une installation locale Hadoop 2.6?
En java, les éléments suivants sont des lignes de code. Vous devez ajouter AWS creds en SparkContext seulement et pas SparkSession.
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
sc.hadoopConfiguration().set("fs.s3a.access.key", AWS_KEY);
sc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_KEY);