Comment lire les entrées de S3 dans une application cluster Spark Streaming EC2

j'essaie de faire lire à mon application Spark Streaming ses entrées d'un répertoire S3, mais je continue de recevoir cette exception après l'avoir lancée avec spark-submit script:

Exception in thread "main" java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).
    at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:66)
    at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:49)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
    at org.apache.hadoop.fs.s3native.$Proxy6.initialize(Unknown Source)
    at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:216)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
    at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:66)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
    at org.apache.spark.streaming.StreamingContext.checkpoint(StreamingContext.scala:195)
    at MainClass$.main(MainClass.scala:1190)
    at MainClass.main(MainClass.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

je place ces variables à travers ce bloc de code comme suggéré ici http://spark.apache.org/docs/latest/ec2-scripts.html (bas de la page):

val ssc = new org.apache.spark.streaming.StreamingContext(
  conf,
  Seconds(60))
ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId",args(2))
ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey",args(3))

args(2) et args(3) sont mes AWS ID de Clé d'Accès et de Sécréter La Clé d'accès de cours.

pourquoi il continue de dire qu'ils ne sont pas fixés?

EDIT: j'ai essayé aussi de cette façon mais j'obtiens la même exception:

val lines = ssc.textFileStream("s3n://"+ args(2) +":"+ args(3) + "@<mybucket>/path/")
26
demandé sur gprivitera 2014-06-05 02:35:21

9 réponses

Impair. Essayez aussi de faire un .set sur le sparkContext . Essayez aussi d'exporter des variables env avant de commencer l'application:

export AWS_ACCESS_KEY_ID=<your access>
export AWS_SECRET_ACCESS_KEY=<your secret>

^^c'est ainsi que nous le faisons.

mise à jour: selon @tribbloid ce qui précède a cassé en 1.3.0, maintenant vous devez faff autour des âges et des âges avec HDFS-site.xml, ou vous pouvez le faire (et cela fonctionne dans une étincelle-shell):

val hadoopConf = sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", myAccessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey", mySecretKey)
23
répondu samthebest 2015-12-21 13:30:10

la configuration suivante fonctionne pour moi, assurez-vous que vous mettez aussi" fs.s3.impl":

val conf = new SparkConf().setAppName("Simple Application").setMaster("local")      
val sc = new SparkContext(conf)
val hadoopConf=sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId",myAccessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey",mySecretKey)
22
répondu harel 2014-10-02 07:50:46

Sur AWS DME les suggestions ci-dessus ne fonctionne pas. Au lieu de cela, j'ai mis à jour les propriétés suivantes dans le site de conf/core.xml:

fs.s3n.awsAccessKeyId et fs.s3n.awsSecretAccessKey avec vos justificatifs D'identité S3.

3
répondu Ishika Paul 2014-12-08 22:01:53

pour ceux qui utilisent EMR, utilisez la construction à étincelles comme décrit à https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark et il suffit de faire référence à S3 avec L'URI S3://. Il n'est pas nécessaire de définir la mise en œuvre de S3 ou une configuration supplémentaire car les justificatifs d'identité sont définis par IAM ou le rôle.

3
répondu ChristopherB 2014-12-29 04:20:14

je voulais mettre les informations d'identification plus en sécurité dans un fichier de configuration sur une de mes partitions cryptées. Donc j'ai fait export HADOOP_CONF_DIR=~/Private/.aws/hadoop_conf avant d'exécuter mon application spark, et j'ai mis un fichier dans ce répertoire (crypté via ecryptfs ) appelé core-site.xml contenant les justificatifs comme ceci:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
  <name>fs.s3n.awsAccessKeyId</name>
  <value>my_aws_access_key_id_here</value>
  </property>
  <property>
  <name>fs.s3n.awsSecretAccessKey</name>
  <value>my_aws_secret_access_key_here</value>
  </property>
</configuration>

HADOOP_CONF_DIR peut aussi être défini dans conf/spark-env.sh .

3
répondu nealmcb 2015-05-02 17:10:34

les dernières versions EMR (testées le 4.6.0) nécessitent la configuration suivante:

val sc = new SparkContext(conf)
val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "com.amazon.ws.emr.hadoop.fs.EmrFileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", myAccessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey", mySecretKey)

bien que dans la plupart des cas hors de la configuration devrait fonctionner - c'est si vous avez des références S3 différentes de celles que vous avez lancé le cluster avec.

2
répondu Dan Osipov 2016-06-22 20:57:45

cela fonctionne pour moi dans 1.4.1 shell:

val conf = sc.getConf
conf.set("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
conf.set("spark.hadoop.fs.s3.awsAccessKeyId", <your access key>)
conf.set("spark.hadoop.fs.s3.awsSecretAccessKey", <your secret key>)
SparkHadoopUtil.get.conf.addResource(SparkHadoopUtil.get.newConfiguration(conf))
...
sqlContext.read.parquet("s3://...")
0
répondu ru2nuts 2016-01-19 20:15:03

pour augmenter la réponse de @nealmcb, la façon la plus simple de le faire est de définir

HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop 

dans conf/spark-env.sh ou exporter cette variable env dans ~/.bashrc ou ~/.bash_profile .

qui fonctionne aussi longtemps que vous pouvez accéder à s3 par hadoop. Par exemple, si vous pouvez exécuter

hadoop fs -ls s3n://path/

alors hadoop peut voir le chemin s3.

si hadoop ne voit pas le chemin, suivez les conseils de contenu dans Comment puis-je accéder à S3/S3n à partir d'une installation locale Hadoop 2.6?

0
répondu Bob Baxley 2017-05-23 11:47:19

En java, les éléments suivants sont des lignes de code. Vous devez ajouter AWS creds en SparkContext seulement et pas SparkSession.

JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
sc.hadoopConfiguration().set("fs.s3a.access.key", AWS_KEY);
sc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_KEY);
0
répondu Atihska 2018-09-28 20:44:25