1. spark 读取 ES

import org.apache.spark.sql.SparkSession

import org.elasticsearch.spark.rdd.EsSpark

object esReadToHdfs {

def main(args: Array[String]): Unit = {

val spark = SparkSession.builder().appName("es_read").getOrCreate()

val sc = spark.sparkContext

val options = Map(

"es.index.auto.create" -> "true",

"es.nodes.wan.only" -> "true",

"es.nodes" -> "29.29.29.29:9200,29.29.29.29:9200",

"es.port" -> "9200",

"es.mapping.id" -> "id"

)

// 返回 RDD[(String, String]]

// 元组:第一个:esmapping.id、第二个 json 字符串

val resultRDD = EsSpark.esJsonRDD(sc, options).map(x => x._2)

// // 返回 RDD[(String, Map[String, AnyDef]]

// val resultRDD = EsSpark.esRDD(sc, options)

}

}

读取 hdfs 文件

[hadoop@hadoop1 apps]$ hadoop fs -cat hdfs://hadoop1:9000/people.json

{"name":"Michael"}

{"name":"Andy", "age":30}

{"name":"Justin", "age":19}

解析采用 fast-json:

1、pom.xml

org.apache.spark

spark-sql_2.11

2.1.1

com.alibaba

fastjson

1.2.47

2、main 文件

package top.midworld.spark1031.create_df

import org.apache.spark.sql.SparkSession

import java.security.MessageDigest

import com.alibaba.fastjson.{JSON, JSONException, JSONObject}

object SaveToEs {

def main(args: Array[String]): Unit = {

// 提交到集群,去掉 master,不能采用 local[2]

val spark = SparkSession.builder.appName("create_rdd").master("local[2]").getOrCreate()

val sc = spark.sparkContext

val rdd = sc.textFile("hdfs://hadoop1:9000/people.json").map {

// x => JSON.parseObject(x).get("name")

x =>

val data = JSON.parseObject(x)

val name = data.get("name").toString

val md5 = hashMD5(name) // name md5

data.put("@id", md5) // 添加新的 key、value

data

}

rdd.collect().foreach(println)

sc.stop()

spark.stop()

}

def hashMD5(url: String): String = {

val md5 = MessageDigest.getInstance("MD5")

val encoded = md5.digest((url).getBytes())

encoded.map("%02x".format(_)).mkString

}

}

运行结果:

{"name":"aaa","@id":"3e06fa3927cbdf4e9d93ba4541acce86"}

{"name":"aaa","@id":"0d2366f384b6c702db8e9dd8b74534db","age":30}

{"name":"aaa","@id":"06475174d922e7dcbb3ed34c0236dbdf","age":19}

2. spark 写入 ES

1、pom.xml

org.apache.spark

spark-core_2.11

2.1.1

org.apache.spark

spark-sql_2.11

2.1.1

org.elasticsearch

elasticsearch-spark-20_2.11

6.0.0

2、main 文件

package top.midworld.spark1031.create_df

import org.apache.spark.sql.SparkSession

import java.security.MessageDigest

import com.alibaba.fastjson.{JSON, JSONException, JSONObject}

import org.apache.spark.SparkConf

import org.elasticsearch.spark._

case class People(name: String, age: Int)

object SaveToEs {

def main(args: Array[String]): Unit = {

val spark = SparkSession.builder.appName("create_rdd").master("local[2]").getOrCreate()

val sc = spark.sparkContext

val conf = new SparkConf().setAppName("save_to_es")

conf.set("es.nodes", "hadoop1:9200,hadoop2:9200,hadoop3:9200")

conf.set("es.port", "9200")

conf.set("es.index.auto.create", "true")

// 将 Map 对象写入 es

val aa = Map("one" -> 1, "two" -> 2, "three" -> 3, "id" -> 11111)

val bb = Map("OTP" -> "Otopeni", "SFO" -> "San Fran", "id" -> 2222)

sc.makeRDD(Seq(aa, bb)).saveToEs("index_name/docs", Map("es.mapping.id" -> "id")) // docs 是 doc_type

// 将 case class对象写入ElasticSearch

val p1 = People("rose", 18)

val p2 = People("lila", 19)

sc.makeRDD(Seq(p1, p2)).saveToEs("index_name/docs")

// 以上都是通过隐士转换才有 saveToEs 方法插入 es,也可以采用显示方法

import org.elasticsearch.spark.rdd.EsSpark

val rdd_case_class = sc.makeRDD(Seq(p1, p2))

EsSpark.saveJsonToEs(rdd_case_class, "index_name/docs")

// 将Json字符串写入ElasticSearch

val json1 = """{"id" : 1, "name" : "rose", "age" : "18"}"""

val json2 = """{"id" : 2, "name" : "lila", "age" : "19"}"""

sc.makeRDD(Seq(json1, json2)).saveJsonToEs("index_name/docs")

// 自定义 es.mapping.id,不指定 es 也会生成唯一的 20 字符长度的 id

// 第三个参数指定 es.mapping.id 为数据中的 id 字段

sc.makeRDD(Seq(json1, json2)).saveJsonToEs("index_name/docs", Map("es.mapping.id" -> "id"))

sc.stop()

spark.stop()

}

}

参考文章

Spark读写ES数据时遇到的问题总结Spark读写ES使用Apache Spark将数据写入ElasticSearch

参考阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: