Comment travailler avec MySQL et Apache Spark?
je veux lancer mon application existante avec Apache Spark et MySQL.
10 réponses
à Partir de pySpark, ça marche pour moi :
dataframe_mysql = mySqlContext.read.format("jdbc").options(
url="jdbc:mysql://localhost:3306/my_bd_name",
driver = "com.mysql.jdbc.Driver",
dbtable = "my_tablename",
user="root",
password="root").load()
en utilisant Scala, cela a fonctionné pour moi : Utilisez les commandes suivantes:
sudo -u root spark-shell --jars /mnt/resource/lokeshtest/guava-12.0.1.jar,/mnt/resource/lokeshtest/hadoop-aws-2.6.0.jar,/mnt/resource/lokeshtest/aws-java-sdk-1.7.3.jar,/mnt/resource/lokeshtest/mysql-connector-java-5.1.38/mysql-connector-java-5.1.38/mysql-connector-java-5.1.38-bin.jar --packages com.databricks:spark-csv_2.10:1.2.0
import org.apache.spark.sql.SQLContext
val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
val dataframe_mysql = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://Public_IP:3306/DB_NAME").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "tblage").option("user", "sqluser").option("password", "sqluser").load()
dataframe_mysql.show()
Pour Scala si vous utilisez le sbt
cela fonctionnera également.
Dans votre build.sbt
fichier:
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.6.2",
"org.apache.spark" %% "spark-sql" % "1.6.2",
"org.apache.spark" %% "spark-mllib" % "1.6.2",
"mysql" % "mysql-connector-java" % "5.1.12"
)
Puis il vous suffit de déclarer votre utilisation du pilote.
Class.forName("com.mysql.jdbc.Driver").newInstance
val conf = new SparkConf().setAppName("MY_APP_NAME").setMaster("MASTER")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val data = sqlContext.read
.format("jdbc")
.option("url", "jdbc:mysql://<HOST>:3306/<database>")
.option("user", <USERNAME>)
.option("password", <PASSWORD>)
.option("dbtable", "MYSQL_QUERY")
.load()
avec spark 2.0.x, vous pouvez utiliser DataFrameReader et DataFrameWriter. Utilisez SparkSession.lire pour accéder Dataaframereader et utiliser L'ensemble de données.Ecrivez pour accéder au DataFrameWriter.
supposons qu'on utilise la coquille d'étincelle.
lire exemple
val prop=new java.util.Properties()
prop.put("user","username")
prop.put("password","yourpassword")
val url="jdbc:mysql://host:port/db_name"
val df=spark.read.jdbc(url,"table_name",prop)
df.show()
lire l'exemple 2
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql:dbserver")
.option("dbtable", “schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
écrire exemple
import org.apache.spark.sql.SaveMode
val prop=new java.util.Properties()
prop.put("user","username")
prop.put("password","yourpassword")
val url="jdbc:mysql://host:port/db_name"
//df is a dataframe contains the data which you want to write.
df.write.mode(SaveMode.Append).jdbc(url,"table_name",prop)
public static void main(String[] args) {
Map<String, String> options = new HashMap<String, String>();
options.put("url","jdbc:postgresql://<DBURL>:<PORT>/<Database>?user=<UserName>&password=<Password>");
options.put("dbtable", "<TableName>");
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("DBConnection").setMaster("local[*]"));
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
// DataFrame jdbcDF = sqlContext.load("jdbc", options).cache();
DataFrame jdbcDF = sqlContext.jdbc(options.get("url"),options.get("dbtable"));
System.out.println("Data------------------->" + jdbcDF.toJSON().first());
Row[] rows = jdbcDF.collect();
System.out.println("Without Filter \n ------------------------------------------------- ");
for (Row row2 : rows) {
System.out.println(row2.toString());
}
System.out.println("Filter Data\n ------------------------------------------------- ");
jdbcDF = jdbcDF.select("agency_id","route_id").where(jdbcDF.col("route_id").$less$eq(3));
rows = jdbcDF.collect();
for (Row row2 : rows) {
System.out.println(row2.toString());
}
}
sur la Base de ce infoobjects article essayez ce qui suit (en supposant Java ou Scala, pas sûr comment cela fonctionnerait avec python):
- ajouter mysql-connector-java sur le chemin de votre faisceau d'étincelles
- initialiser le pilote:
Class.forName("com.mysql.jdbc.Driver")
- créer un JdbcRDD source des données:
val myRDD = new JdbcRDD( sc, () =>
DriverManager.getConnection(url,username,password),
"select first_name,last_name,gender from person limit ?, ?",
1,//lower bound
5,//upper bound
2,//number of partitions
r =>
r.getString("last_name") + ", " + r.getString("first_name"))
Pour le Spark 2.1.0 et Scala (Sur Windows 7 OS), le code ci-dessous fonctionne très bien pour moi:
import org.apache.spark.sql.SparkSession
object MySQL {
def main(args: Array[String]) {
//At first create a Spark Session as the entry point of your app
val spark:SparkSession = SparkSession
.builder()
.appName("JDBC")
.master("local[*]")
.config("spark.sql.warehouse.dir", "C:/Exp/")
.getOrCreate();
val dataframe_mysql = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost/feedback")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "person") //replace with own
.option("user", "root") //replace with own
.option("password", "vertrigo") // replace with own
.load()
dataframe_mysql.show()
}
}
Pour Java, cela a fonctionné pour moi:
@Bean
public SparkConf sparkConf() {
SparkConf sparkConf = new SparkConf()
.setAppName(appName)
.setSparkHome(sparkHome)
.setMaster(masterUri);
return sparkConf;
}
@Bean
public JavaSparkContext javaSparkContext() {
return new JavaSparkContext(sparkConf());
}
@Bean
public SparkSession sparkSession() {
return SparkSession
.builder()
.sparkContext(javaSparkContext().sc())
.appName("Java Spark SQL basic example")
.getOrCreate();
}
Properties properties = new Properties();
properties.put("user", "root");
properties.put("password", "root");
properties.put("driver", "com.mysql.cj.jdbc.Driver");
sparkSession.read()
.jdbc("jdbc:mysql://localhost:3306/books?useSSL=false", "(SELECT books.BOOK_ID as BOOK_ID, books.BOOK_TITLE as BOOK_TITLE, books.BOOK_AUTHOR as BOOK_AUTHOR, borrowers.BORR_NAME as BORR_NAME FROM books LEFT OUTER JOIN borrowers ON books.BOOK_ID = borrowers.BOOK_ID) as t", properties) // join example
.show();
bien sûr, pour MySQL, j'avais besoin de connecteur:
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>6.0.6</version>
</dependency>
Et j'obtiens
+-------+------------------+--------------+---------------+
|BOOK_ID| BOOK_TITLE| BOOK_AUTHOR| BORR_NAME|
+-------+------------------+--------------+---------------+
| 1| Gyűrű kúra|J.R.K. Tolkien| Sára Sarolta|
| 2| Kecske-eledel| Mekk Elek|Maláta Melchior|
| 3| Répás tészta| Vegán Eleazár| null|
| 4|Krumpli és pityóka| Farmer Emília| null|
+-------+------------------+--------------+---------------+
pour Java (en utilisant maven), ajoutez les dépendances spark et les dépendances de pilotes sql dans votre pom.fichier xml
<properties>
<java.version>1.8</java.version>
<spark.version>1.6.3</spark.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>6.0.6</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
exemple de code, supposons que votre mysql localise sur local,nom de base de données test,nom d'utilisateurracine et mot de passemot de passe, et deux tables dans le test db sont table1 et table2
SparkConf sparkConf = new SparkConf();
SparkContext sc = new SparkContext("local", "spark-mysql-test", sparkConf);
SQLContext sqlContext = new SQLContext(sc);
// here you can run sql query
String sql = "(select * from table1 join table2 on table1.id=table2.table1_id) as test_table";
// or use an existed table directly
// String sql = "table1";
DataFrame dataFrame = sqlContext
.read()
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true")
.option("user", "root")
.option("password", "password")
.option("dbtable", sql)
.load();
// continue your logical code
......
val query: String =
"select col1, col2 from schema.table_name where condition"
val url= "jdbc:mysql://<ip>:3306/<schema>"
val username = ""
val password = ""
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.load("jdbc", Map(
"url" -> (url + "/?user=" + username + "&password=" + password),
"dbtable" -> s"($query) as tbl",
"driver" -> "com.mysql.jdbc.Driver"))
df.show()