所需依赖

以下是我添加的依赖,实际环境按需添加,已经引入的也不必重复引入。

org.apache.hadoop

hadoop-common

2.7.3

org.apache.hadoop

hadoop-client

2.7.3

org.apache.hadoop

hadoop-hdfs

2.7.3

com.fasterxml.jackson.module

jackson-module-scala_2.11

2.9.2

org.apache.hbase

hbase-common

1.1.2

org.apache.hbase

hbase-client

1.1.2

org.apache.hbase

hbase-server

1.1.2

org.apache.spark

spark-sql_2.11

2.1.1

org.apache.spark

spark-core_2.11

2.1.1

org.scala-lang

scala-library

2.11.12

Spark读取Hbase并将结果转换成DataFrame

package test

import org.apache.hadoop.conf.Configuration

import org.apache.spark.sql.{DataFrame, Row, SparkSession}

import org.apache.hadoop.hbase.HBaseConfiguration

import org.apache.hadoop.hbase.client.Result

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.hadoop.hbase.util.Bytes

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

import java.util

import scala.collection.mutable

import scala.reflect.ClassTag.AnyVal

/**

* @author Alex

*/

object HbaseReadTest {

def main(args: Array[String]): Unit = {

val spark: SparkSession = SparkSession.builder().appName("test").master("local").config("spark.some.config.option", "some-value").getOrCreate()

//创建hbaseConfig

val hbaseConfig: Configuration = HBaseConfiguration.create()

//要读取的表

hbaseConfig.set(TableInputFormat.INPUT_TABLE, "student")

//要读取的字段,可以支持多个列簇的不同字段,注意以空格分隔

hbaseConfig.set(TableInputFormat.SCAN_COLUMNS, "info:name info:age info:gender sign:aaa sign:bbb")

//读取的起始行和结束行,左闭右开,如以下设置,则读取的是第2、3行数据

hbaseConfig.set(TableInputFormat.SCAN_ROW_START, "2")

hbaseConfig.set(TableInputFormat.SCAN_ROW_STOP, "4")

//获取数据

val hBaseRDD: RDD[(ImmutableBytesWritable, Result)] = spark.sparkContext.newAPIHadoopRDD(hbaseConfig, classOf[TableInputFormat],

classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],

classOf[org.apache.hadoop.hbase.client.Result])

println("数据量" + hBaseRDD.count())

hBaseRDD.cache()

//构造一个列簇和字段的对应关系的map,用于数据的解析,用于字段传参,若字段已知且固定,则不需要该操作

//我这里字段是通过传参获取的,所以模拟一个字段列表参数

var rowkeyColMapList: Seq[Map[String, String]] = List(Map())

var rowkeyColMap1: Map[String, String] = Map()

rowkeyColMap1 += ("info" -> "name")

rowkeyColMapList = rowkeyColMapList.:+(rowkeyColMap1)

var rowkeyColMap2: Map[String, String] = Map()

rowkeyColMap2 += ("info" -> "age")

rowkeyColMapList = rowkeyColMapList.:+(rowkeyColMap2)

var rowkeyColMap3: Map[String, String] = Map()

rowkeyColMap3 += ("info" -> "gender")

rowkeyColMapList = rowkeyColMapList.:+(rowkeyColMap3)

var rowkeyColMap4: Map[String, String] = Map()

rowkeyColMap4 += ("sign" -> "aaa")

rowkeyColMapList = rowkeyColMapList.:+(rowkeyColMap4)

var rowkeyColMap5: Map[String, String] = Map()

rowkeyColMap5 += ("sign" -> "bbb")

rowkeyColMapList = rowkeyColMapList.:+(rowkeyColMap5)

//下面的操作是将查询的结果转换成Dataframe,首先将hBaseRDD转换成rdd: RDD[Row]

val rdd: RDD[Row] = hBaseRDD.map({ case (_, result) =>

resultToRow(result, rowkeyColMapList)

})

//构造Dataframe的Schema

val newSchema: mutable.MutableList[StructField] = mutable.MutableList[StructField]()

newSchema += StructField("name", StringType, nullable = true)

newSchema += StructField("age", IntegerType, nullable = true)

newSchema += StructField("gender", StringType, nullable = true)

newSchema += StructField("aaa", IntegerType, nullable = true)

newSchema += StructField("bbb", StringType, nullable = true)

val schema: StructType = StructType(newSchema)

//创建Dataframe

val df: DataFrame = spark.sqlContext.createDataFrame(rdd, schema)

df.show()

}

/**

* 该方法将hBaseRDD的每一行转换成rdd: RDD[Row]的每一行,用于创建Dataframe

* @param result

* @param rowkeyColMapList

* @return

*/

def resultToRow(result: Result, rowkeyColMapList: Seq[Map[String, String]]): Row = {

//hbase表的rowkey

val rowkey: String = Bytes.toString(result.getRow)

//列表,用来装一行的每一个值

val values = new util.ArrayList[Any]()

var value: Any = AnyVal

//遍历一行数据的每一个值,将bytes转换成对应的数据类型,map的key是列簇,value是字段名

rowkeyColMapList.foreach(

(item: Map[String, String]) => item.foreach((map: (String, String)) => {

if (map._2.equalsIgnoreCase("age") || map._2.equalsIgnoreCase("aaa")) {

value = Bytes.toInt(result.getValue(map._1.getBytes, map._2.getBytes))

} else {

value = Bytes.toString(result.getValue(map._1.getBytes, map._2.getBytes))

}

values.add(value)

})

)

//将列表里的值转化成Row

Row.fromSeq(values.toArray)

}

}

Spark RDD写入Hbase

package test

import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.hbase.HBaseConfiguration

import org.apache.hadoop.hbase.client.{Put, Result}

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.TableOutputFormat

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.mapreduce.Job

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

/**

* @author Alex

*/

object HbaseWriteTest {

def main(args: Array[String]): Unit = {

val sparkConf: SparkConf = new SparkConf().setAppName("SparkWriteHBase").setMaster("local")

val sc = new SparkContext(sparkConf)

val tableName = "student"

val hbaseConfig: Configuration = HBaseConfiguration.create()

hbaseConfig.set(TableOutputFormat.OUTPUT_TABLE, tableName)

val job: Job = Job.getInstance(hbaseConfig)

job.setOutputKeyClass(classOf[ImmutableBytesWritable])

job.setOutputValueClass(classOf[Result])

job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

//构建两行记录

val dataRDD: RDD[String] = sc.makeRDD(Array("1,Rongcheng,M,26,5,111", "2,Guanhua,M,27,6,222", "3,HuaWer,F,35,8,333"))

val rdd: RDD[(ImmutableBytesWritable, Put)] = dataRDD.map(_.split(',')).map { arr: Array[String] => {

//行健的值

val put = new Put(Bytes.toBytes(arr(0)))

//info:name列的值

put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(arr(1)))

//info:gender列的值

put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("gender"), Bytes.toBytes(arr(2)))

//info:age列的值

put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(arr(3).toInt))

put.addColumn(Bytes.toBytes("sign"), Bytes.toBytes("aaa"), Bytes.toBytes(arr(4).toInt))

put.addColumn(Bytes.toBytes("sign"), Bytes.toBytes("bbb"), Bytes.toBytes(arr(5)))

(new ImmutableBytesWritable, put)

}

}

rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)

println("写入完成")

}

}

参考文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: