Comment puis-je me connecter à une base de données postgreSQL dans Apache Spark en utilisant scala?
Je veux savoir comment puis-je faire les choses suivantes dans scala?
- Se connecter à une base de données postgreSQL en utilisant Spark scala.
- écrire des requêtes SQL comme SELECT, UPDATE etc. pour modifier une table dans cette base de données.
Je sais le faire en utilisant scala mais comment importer le jar de connecteur de psql scala dans sbt tout en l'empaquetant?
1 réponses
Notre objectif est d'exécuter des requêtes SQL parallèles à partir des travailleurs Spark.
Configuration de construction
Ajoutez le connecteur et JDBC au libraryDependencies
dans build.sbt
. J'ai seulement essayé cela avec MySQL, donc je vais l'utiliser dans mes exemples, mais Postgres devrait être à peu près le même.
libraryDependencies ++= Seq(
jdbc,
"mysql" % "mysql-connector-java" % "5.1.29",
"org.apache.spark" %% "spark-core" % "1.0.1",
// etc
)
Code
Lorsque vous créez le {[7] } vous lui dites quels pots copier aux exécuteurs. Inclure le connecteur pot. Une bonne façon de le faire:
val classes = Seq(
getClass, // To get the jar with our own code.
classOf[mysql.jdbc.Driver] // To get the connector.
)
val jars = classes.map(_.getProtectionDomain().getCodeSource().getLocation().getPath())
val conf = new SparkConf().setJars(jars)
Maintenant Spark est prêt à se connecter au la base de données. Chaque exécuteur exécutera une partie de la requête, de sorte que les résultats soient prêts pour le calcul distribué.
Il y a deux options pour cela. L'ancienne approche est d'utiliser org.apache.spark.rdd.JdbcRDD
:
val rdd = new org.apache.spark.rdd.JdbcRDD(
sc,
() => {
sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
},
"SELECT * FROM BOOKS WHERE ? <= KEY AND KEY <= ?",
0, 1000, 10,
row => row.getString("BOOK_TITLE")
)
Consultez la documentation pour les paramètres. En bref:
- Vous avez le
SparkContext
. - puis une fonction qui crée la connexion. Cela sera appelé sur chaque travailleur pour se connecter à la base de données.
- puis la requête SQL. Cela doit être similaire à l'exemple, et contiennent des espaces réservés pour les clés de début et de fin.
- ensuite, vous spécifiez la plage de clés (0 à 1000 dans mon exemple) et le nombre de partitions. La plage sera divisée entre les partitions. Ainsi, un thread d'exécuteur finira par Exécuter
SELECT * FROM FOO WHERE 0 <= KEY AND KEY <= 100
dans l'exemple. - et enfin nous avons une fonction qui convertit le
ResultSet
en quelque chose. Dans l'exemple, nous le convertissons enString
, donc vous vous retrouvez avec unRDD[String]
.
Depuis la version Apache Spark 1.3.0 une autre méthode est disponible via L'API DataFrame. Au lieu de la JdbcRDD
vous devez créer un org.apache.spark.sql.DataFrame
:
val df = sqlContext.load("jdbc", Map(
"url" -> "jdbc:mysql://mysql.example.com/?user=batman&password=alfred",
"dbtable" -> "BOOKS"))
Voir https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databases pour la liste complète des options (la plage de touches et le nombre de partitions peuvent être définis comme avec JdbcRDD
).
Mises à jour
JdbcRDD
ne prend pas en charge les mises à jour. Mais vous pouvez simplement les faire dans un foreachPartition
.
rdd.foreachPartition { it =>
val conn = sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
val del = conn.prepareStatement("DELETE FROM BOOKS WHERE BOOK_TITLE = ?")
for (bookTitle <- it) {
del.setString(1, bookTitle)
del.executeUpdate
}
}
(cela crée un connexion par partition. Si c'est un problème, utilisez un pool de connexion!)
DataFrame
S prend en charge les mises à jour via les méthodes createJDBCTable
et insertIntoJDBC
.