Comment lire à partir de hbase en utilisant spark
le code ci-dessous Lira à partir de l'hbase, puis le convertira en structure json et le convertira en schemaRDD , mais le problème est que je suis using List
pour stocker la chaîne json puis passer à javaRDD, pour des données d'environ 100 Go le master sera chargé avec des données en mémoire. Quelle est la bonne façon de charger les données de hbase puis effectuer la manipulation,puis convertir en JavaRDD.
package hbase_reader;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;
import com.google.common.collect.Lists;
public class hbase_reader {
public static void main(String[] args) throws IOException, ParseException {
List<String> jars = Lists.newArrayList("");
SparkConf spconf = new SparkConf();
spconf.setMaster("local[2]");
spconf.setAppName("HBase");
//spconf.setSparkHome("/opt/human/opt/spark-0.9.0-hdp1");
spconf.setJars(jars.toArray(new String[jars.size()]));
JavaSparkContext sc = new JavaSparkContext(spconf);
//spconf.set("spark.executor.memory", "1g");
JavaSQLContext jsql = new JavaSQLContext(sc);
HBaseConfiguration conf = new HBaseConfiguration();
String tableName = "HBase.CounData1_Raw_Min1";
HTable table = new HTable(conf,tableName);
try {
ResultScanner scanner = table.getScanner(new Scan());
List<String> jsonList = new ArrayList<String>();
String json = null;
for(Result rowResult:scanner) {
json = "";
String rowKey = Bytes.toString(rowResult.getRow());
for(byte[] s1:rowResult.getMap().keySet()) {
String s1_str = Bytes.toString(s1);
String jsonSame = "";
for(byte[] s2:rowResult.getMap().get(s1).keySet()) {
String s2_str = Bytes.toString(s2);
for(long s3:rowResult.getMap().get(s1).get(s2).keySet()) {
String s3_str = new String(rowResult.getMap().get(s1).get(s2).get(s3));
jsonSame += """+s2_str+"":"+s3_str+",";
}
}
jsonSame = jsonSame.substring(0,jsonSame.length()-1);
json += """+s1_str+"""+":{"+jsonSame+"}"+",";
}
json = json.substring(0,json.length()-1);
json = "{"RowKey":""+rowKey+"","+json+"}";
jsonList.add(json);
}
JavaRDD<String> jsonRDD = sc.parallelize(jsonList);
JavaSchemaRDD schemaRDD = jsql.jsonRDD(jsonRDD);
System.out.println(schemaRDD.take(2));
} finally {
table.close();
}
}
}
4 réponses
un exemple de base pour lire les données HBase en utilisant Spark( Scala), vous pouvez aussi écrire ceci en Java:
import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}
import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor }
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark._
object HBaseRead {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
val tableName = "table1"
System.setProperty("user.name", "hdfs")
System.setProperty("HADOOP_USER_NAME", "hdfs")
conf.set("hbase.master", "localhost:60000")
conf.setInt("timeout", 120000)
conf.set("hbase.zookeeper.quorum", "localhost")
conf.set("zookeeper.znode.parent", "/hbase-unsecure")
conf.set(TableInputFormat.INPUT_TABLE, tableName)
val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(tableName)) {
val tableDesc = new HTableDescriptor(tableName)
admin.createTable(tableDesc)
}
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
println("Number of Records found : " + hBaseRDD.count())
sc.stop()
}
}
UPDATED -2016
à partir de Spark 1.0.x+, Maintenant vous pouvez utiliser le connecteur Spark-HBase aussi:
Maven Dépendance à Comprendre :
<dependency>
<groupId>it.nerdammer.bigdata</groupId>
<artifactId>spark-hbase-connector_2.10</artifactId>
<version>1.0.3</version> // Version can be changed as per your Spark version, I am using Spark 1.6.x
</dependency>
Et de trouver ci-dessous un exemple de code pour la même chose :
import org.apache.spark._
import it.nerdammer.spark.hbase._
object HBaseRead extends App {
val sparkConf = new SparkConf().setAppName("Spark-HBase").setMaster("local[4]")
sparkConf.set("spark.hbase.host", "<YourHostnameOnly>") //e.g. 192.168.1.1 or localhost or your hostanme
val sc = new SparkContext(sparkConf)
// For Example If you have an HBase Table as 'Document' with ColumnFamily 'SMPL' and qualifier as 'DocID, Title' then:
val docRdd = sc.hbaseTable[(Option[String], Option[String])]("Document")
.select("DocID", "Title").inColumnFamily("SMPL")
println("Number of Records found : " + docRdd .count())
}
UPDATED-2017
à partir de L'étincelle 1.6.x+, Maintenant vous pouvez utiliser le connecteur SHC aussi (utilisateurs de Hortonworks ou HDP) :
Maven Dépendance à Comprendre :
<dependency>
<groupId>com.hortonworks</groupId>
<artifactId>shc</artifactId>
<version>1.0.0-2.0-s_2.11</version> // Version depends on the Spark version and is supported upto Spark 2.x
</dependency>
le principal avantage de l'utilisation de ce connecteur est qu'il a la flexibilité dans la définition du schéma et n'a pas besoin de params codés comme dans nerdammer/connecteur spark-hbase. Rappelez-vous aussi qu'il supporte Spark 2.x donc ce connecteur est assez flexible et fournit un soutien de bout en bout dans les problèmes et PRs.
trouver le chemin de dépôt ci-dessous pour les derniers readme et samples :
Connecteur Hortonworks Spark HBase
vous pouvez également convertir ce RDD en DataFrames et exécuter SQL dessus ou vous pouvez mapper ces données ou DataFrames à des classes Java Pojo ou Case définies par l'utilisateur. Il fonctionne brillant.
veuillez commenter ci-dessous si vous avez besoin d'autre chose.
je préfère lire à partir de hbase et faire la manipulation json tout en spark.
Spark fournit JavaSparkContext.newAPIHadoopRDD fonction pour lire les données du stockage hadoop, y compris HBase. Vous devrez fournir la configuration de HBase, le nom de la table, et Scanner dans le paramètre de configuration et le format d'entrée de la table et c'est la valeur de la clé
Vous pouvez utiliser format d'entrée de tableau class et son paramètre de travail pour fournir le nom de la table et le scan configuration
exemple:
conf.set(TableInputFormat.INPUT_TABLE, "tablename");
JavaPairRDD<ImmutableBytesWritable, Result> data =
jsc.newAPIHadoopRDD(conf, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);
alors vous pouvez faire la manipulation json dans spark. Puisque spark peut faire le recalcul lorsque la mémoire est pleine, il ne chargera que les données nécessaires pour la partie recalcul (cmiiw) de sorte que vous n'avez pas à vous soucier de la taille des données
juste pour ajouter un commentaire sur la façon d'ajouter scan:
TableInputFormat possède les attributs suivants:
- SCAN_ROW_START
- SCAN_ROW_STOP
conf.set(TableInputFormat.SCAN_ROW_START, "startrowkey");
conf.set(TableInputFormat.SCAN_ROW_STOP, "stoprowkey");
- hbase-spark un module qui est disponible directement dans le HBase repo
- Spark-sur-HBase par hortonworks a
Je ne sais pas grand chose sur le premier projet, mais il semble qu'il ne supporte pas Spark 2.x. Cependant, il a un soutien riche au niveau de la RDD pour Spark 1.6.x.
ici qu'il est capable de diviser l'élagage, l'élagage de la colonne, de prédire le rabattement et d'atteindre la localisation des données.Un exemple simple, qui utilise le com.hortonworks:shc:1.0.0-2.0-s_2.11
artefact de ce repo Allumage et de 2.0.2, est présenté suivant:
case class Record(col0: Int, col1: Int, col2: Boolean)
val spark = SparkSession
.builder()
.appName("Spark HBase Example")
.master("local[4]")
.getOrCreate()
def catalog =
s"""{
|"table":{"namespace":"default", "name":"table1"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"int"},
|"col1":{"cf":"cf1", "col":"col1", "type":"int"},
|"col2":{"cf":"cf2", "col":"col2", "type":"boolean"}
|}
|}""".stripMargin
val artificialData = (0 to 100).map(number => Record(number, number, number % 2 == 0))
// write
spark
.createDataFrame(artificialData)
.write
.option(HBaseTableCatalog.tableCatalog, catalog)
.option(HBaseTableCatalog.newTable, "5")
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
// read
val df = spark
.read
.option(HBaseTableCatalog.tableCatalog, catalog)
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
df.count()