Comment puis-je me connecter à une base de données postgreSQL dans Apache Spark en utilisant scala?

Je veux savoir comment puis-je faire les choses suivantes dans scala?

  1. Se connecter à une base de données postgreSQL en utilisant Spark scala.
  2. écrire des requêtes SQL comme SELECT, UPDATE etc. pour modifier une table dans cette base de données.

Je sais le faire en utilisant scala mais comment importer le jar de connecteur de psql scala dans sbt tout en l'empaquetant?

36
demandé sur febinsathar 2014-07-23 21:15:35

1 réponses

Notre objectif est d'exécuter des requêtes SQL parallèles à partir des travailleurs Spark.

Configuration de construction

Ajoutez le connecteur et JDBC au libraryDependencies dans build.sbt. J'ai seulement essayé cela avec MySQL, donc je vais l'utiliser dans mes exemples, mais Postgres devrait être à peu près le même.

libraryDependencies ++= Seq(
  jdbc,
  "mysql" % "mysql-connector-java" % "5.1.29",
  "org.apache.spark" %% "spark-core" % "1.0.1",
  // etc
)

Code

Lorsque vous créez le {[7] } vous lui dites quels pots copier aux exécuteurs. Inclure le connecteur pot. Une bonne façon de le faire:

val classes = Seq(
  getClass,                   // To get the jar with our own code.
  classOf[mysql.jdbc.Driver]  // To get the connector.
)
val jars = classes.map(_.getProtectionDomain().getCodeSource().getLocation().getPath())
val conf = new SparkConf().setJars(jars)

Maintenant Spark est prêt à se connecter au la base de données. Chaque exécuteur exécutera une partie de la requête, de sorte que les résultats soient prêts pour le calcul distribué.

Il y a deux options pour cela. L'ancienne approche est d'utiliser org.apache.spark.rdd.JdbcRDD:

val rdd = new org.apache.spark.rdd.JdbcRDD(
  sc,
  () => {
    sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
  },
  "SELECT * FROM BOOKS WHERE ? <= KEY AND KEY <= ?",
  0, 1000, 10,
  row => row.getString("BOOK_TITLE")
)

Consultez la documentation pour les paramètres. En bref:

  • Vous avez le SparkContext.
  • puis une fonction qui crée la connexion. Cela sera appelé sur chaque travailleur pour se connecter à la base de données.
  • puis la requête SQL. Cela doit être similaire à l'exemple, et contiennent des espaces réservés pour les clés de début et de fin.
  • ensuite, vous spécifiez la plage de clés (0 à 1000 dans mon exemple) et le nombre de partitions. La plage sera divisée entre les partitions. Ainsi, un thread d'exécuteur finira par Exécuter SELECT * FROM FOO WHERE 0 <= KEY AND KEY <= 100 dans l'exemple.
  • et enfin nous avons une fonction qui convertit le ResultSet en quelque chose. Dans l'exemple, nous le convertissons en String, donc vous vous retrouvez avec un RDD[String].

Depuis la version Apache Spark 1.3.0 une autre méthode est disponible via L'API DataFrame. Au lieu de la JdbcRDD vous devez créer un org.apache.spark.sql.DataFrame:

val df = sqlContext.load("jdbc", Map(
  "url" -> "jdbc:mysql://mysql.example.com/?user=batman&password=alfred",
  "dbtable" -> "BOOKS"))

Voir https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databases pour la liste complète des options (la plage de touches et le nombre de partitions peuvent être définis comme avec JdbcRDD).

Mises à jour

JdbcRDD ne prend pas en charge les mises à jour. Mais vous pouvez simplement les faire dans un foreachPartition.

rdd.foreachPartition { it =>
  val conn = sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
  val del = conn.prepareStatement("DELETE FROM BOOKS WHERE BOOK_TITLE = ?")
  for (bookTitle <- it) {
    del.setString(1, bookTitle)
    del.executeUpdate
  }
}

(cela crée un connexion par partition. Si c'est un problème, utilisez un pool de connexion!)

DataFrameS prend en charge les mises à jour via les méthodes createJDBCTable et insertIntoJDBC.

42
répondu Daniel Darabos 2015-10-06 10:11:57