Mryqu's Notes


  • 首页

  • 搜索
close

[Spark] Spark读取HBase

时间: 2018-07-12   |   分类: BigData     |   阅读: 510 字 ~3分钟

Spark读取Hbase有以下几张方式:

  • Spark的JavaSparkContext.newAPIHadoopRDD / SparkContext.newAPIHadoopRDD方法
  • HBase的hbase-spark
  • Hortonworks的Spark HBase Connector
  • Cloudera labs的SparkOnHBase 本文就Spark自带的方法进行示范和演示。

HBase数据库

Hbase DB

Spark范例

HelloSparkHBase.java

import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.api.java.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import scala.Tuple2;

public class HelloSparkHBase {

  public static void main(String[] args) {
    try {
      Configuration conf = HBaseConfiguration.create();
      conf.set(TableInputFormat.INPUT_TABLE, "student");
      SparkSession spark = SparkSession
          .builder()
          .appName("
          .config("spark.some.config.option", "some-value")
          .getOrCreate();

      try (Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin()) {
        if (admin.tableExists(TableName.valueOf("student"))) {
          System.out.println("Table student exist at HBase!");

          SparkContext sc = spark.sparkContext();          
          JavaRDD<Tuple2<ImmutableBytesWritable, Result>> studentPairRDD = 
               sc.newAPIHadoopRDD(conf,
                                  TableInputFormat.class,
                                  ImmutableBytesWritable.class,
                                  Result.class).to;

          JavaRDD<StudentRecord> studentRDD = studentPairRDD.map(
              new Function<Tuple2<ImmutableBytesWritable,Result>, 
              StudentRecord>() {
            @Override
            public StudentRecord call(
                Tuple2<ImmutableBytesWritable, Result> tuple)
                throws Exception {
              Result result = tuple._2;
              String rowKey = Bytes.toString(result.getRow());//row key
              String state = Bytes.toString(result.getValue(
                  Bytes.toBytes("addr"),
                  Bytes.toBytes("state")));
              String city = Bytes.toString(result.getValue(
                  Bytes.toBytes("addr"),
                  Bytes.toBytes("city")));
              String dt = Bytes.toString(result.getValue(
                  Bytes.toBytes("score"),
                  Bytes.toBytes("date")));
              String numb = Bytes.toString(result.getValue(
                  Bytes.toBytes("score"),
                  Bytes.toBytes("numb")));
              StudentRecord record = new StudentRecord(
                  rowKey, state, city, dt, numb);
              return record;
            }
          });

          Dataset<Row> studentDF = spark.createDataFrame(
                   studentRDD, 
                   StudentRecord.class);
          studentDF.show();
        }
      }
      spark.close();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

StudentRecord.java

import java.io.Serializable;

public class StudentRecord implements Serializable {
  private String name;
  private String addrState;
  private String addrCity;
  private String scoreDate;
  private String scoreNumb;

  public StudentRecord() { }

  public StudentRecord(String name, 
           String addrState, 
           String addrCity, 
           String scoreDate, 
           String scoreNumb) {
    this.name = name;
    this.addrState = addrState;
    this.addrCity = addrCity;
    this.scoreDate = scoreDate;
    this.scoreNumb = scoreNumb;
  }

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public String getAddrState() {
    return addrState;
  }

  public void setAddrState(String addrState) {
    this.addrState = addrState;
  }

  public String getAddrCity() {
    return addrCity;
  }

  public void setAddrCity(String addrCity) {
    this.addrCity = addrCity;
  }

  public String getScoreDate() {
    return scoreDate;
  }

  public void setScoreDate(String scoreDate) {
    this.scoreDate = scoreDate;
  }

  public String getScoreNumb() {
    return scoreNumb;
  }

  public void setScoreNumb(String scoreNumb) {
    this.scoreNumb = scoreNumb;
  }

  @Override
  public String toString() {
    return "StudentRecord{" +
        "name='" + name + '\'' +
        ", addrState='" + addrState + '\'' +
        ", addrCity='" + addrCity + '\'' +
        ", scoreDate='" + scoreDate + '\'' +
        ", scoreNumb='" + scoreNumb + '\'' +
        '}';
  }
}

run.sh

#!/bin/bash

HADOOP_HOME=/usr/local/hadoop
HBASE_HOME=/usr/local/hbase
SPARK_HOME=/usr/local/spark

CLASSPATH=.:$HBASE_HOME/conf:$(hbase classpath):/usr/local/spark/jars/*

rm -rf *.jar
rm -rf *.class

javac -cp $CLASSPATH HelloSparkHBase.java
jar cvfe hello-spark-hbase.jar HelloSparkHBase *.class
spark-submit --class HelloSparkHBase --deploy-mode client --master local[2] --conf spark.executor.extraLibraryPath=$(hbase classpath) --jars $(echo /usr/local/hbase/lib/*.jar | tr ' ' ',') ./hello-spark-hbase.jar

spark-submit的–jars选项仅接受逗号分隔的jar文件,不接受目录扩展。

测试

hadoop@node50064:~/sparkhbasetest$./run.sh
......
Table student exist at HBase!
......
+--------+---------+-----+----------+---------+
|addrCity|addrState| name| scoreDate|scoreNumb|
+--------+---------+-----+----------+---------+
|    null|    HeNan| alex|      null|     null|
| chengdu|  SiChuan|jason|2013-10-01|     1236|
|    null|   ShanXi|nancy|      null|     6666|
| tianjin|  TianJin| tina|      null|     null|
+--------+---------+-----+----------+---------+
......

参考


Spark SQL, DataFrames and Datasets Guide
StackOverflow: How to read from hbase using spark
Reading Data from HBase table using Spark
StackOverflow: Spark spark-submit –jars arguments wants comma list, how to declare a directory of jars?
Use Spark to read and write HBase data
Spark On Hbase Read (
Setting Up a Sample Application in HBase, Spark, and HDFS
Spark与HBase的整合
Spark读写Hbase的二种方式对比
Spark实战之读写HBase

标题:[Spark] Spark读取HBase
作者:mryqu
声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 3.0 CN 许可协议。转载请注明出处!

#spark# #hbase# #newapihadooprdd# #javasparkcontext# #sparkcontext# #Java#
[Oozie] 遭遇ShareLib无法找到的问题
[Sqoop]尝试Sqoop
  • 文章目录
  • 站点概览

Programmer & Architect

662 日志
27 分类
1472 标签
GitHub Twitter FB Page
    • HBase数据库
    • Spark范例
    • 测试
    • 参考
© 2009 - 2023 Mryqu's Notes
Powered by - Hugo v0.120.4
Theme by - NexT
0%