目录

1、从集合中加载数据源

2、从文件中读取数据

3、从 Kafka 中读取数据

4、监控窗口加载数据:

5、自定义数据源

1、从集合中加载数据源

使用 fromCollection() 方法将数据放入list中作为Flink的数据源

        方式一:

def main(args: Array[String]): Unit = {

// 创建flink环境

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

// 设置并行度

env.setParallelism(2)

var dataList = List(

1,2,3,4,5,21,12,3445,65

)

// 加载数据源1 source 方式一

val stream1: DataStream[Int] = env.fromCollection(dataList)

// 控制台输出sink

stream1.print()

env.execute("source stu")

}

}

控制台输出:

         方式二:

        创建样例类,使用 fromCollection() 方法将数据放入Seq序列中作为Flink的数据源

case class SensorReading(id:String,timestamp:Long,temperature:Double)

object SourceTest {

def main(args: Array[String]): Unit = {

// 创建flink环境

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

// 设置并行度

env.setParallelism(2)

var dataList2 = List(

SensorReading("sensor_1",1665103420,38.1),

SensorReading("sensor_4",1665103420,31.2),

SensorReading("sensor_7",1665103420,15.4),

SensorReading("sensor_9",1665103420,25.8)

)

// 加载数据源1

val stream1: DataStream[SensorReading] = env.fromCollection(dataList2)

// 控制台输出sink

stream1.print()

env.execute("source stu")

}

}

控制台输出:

         还可以通过调用 fromElements() 方法将数据直接输入:

object SourceTest {

def main(args: Array[String]): Unit = {

// 创建flink环境

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

// 设置并行度

env.setParallelism(2)

val stream1: DataStream[Any] = env.fromElements("hello", 1, 1.5, "hello gogo")

// 控制台输出sink

stream1.print()

env.execute("source stu")

}

}

控制台输出:

 2、从文件中读取数据

readTextFile()

通过 readTextFile() 方法可以将本地或hdfs上的文件作为数据源读入Flink

"E:\\Study_Project\\flink\\resource\\sensor.txt" 路径下存在文件:

object SourceTest {

def main(args: Array[String]): Unit = {

// 创建flink环境

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

// 设置并行度

env.setParallelism(2)

//加载数据源3--------------

var path="E:\\Study_Project\\flink\\resource\\sensor.txt"

val stream1: DataStream[String] = env.readTextFile(path)

// 控制台输出sink

stream1.print()

env.execute("source stu")

}

}

控制台输出:

 3、从 Kafka 中读取数据

通过 addSource() 方法加载kafka中的数据 

object SourceTest {

def main(args: Array[String]): Unit = {

// 创建flink环境

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

// 设置并行度

env.setParallelism(2)

val properties = new Properties()

properties.setProperty("bootstrap.servers","192.168.89.140:9092")

properties.setProperty("group_id","sensor_group")

properties.setProperty("auto.offset.reset","latest")

val stream1: DataStream[String] =

env.addSource(new FlinkKafkaConsumer[String]("sensor", new SimpleStringSchema(), properties))

// 控制台输出sink

stream1.print()

env.execute("source stu")

}

}

kafka客户端topic-sensor下生产数据:

 控制台输出:

 4、监控窗口加载数据:

linux下开启监控窗口:

nc -lk 7777 在服务器端启动端口

object SourceTest {

def main(args: Array[String]): Unit = {

// 创建flink环境

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

// 设置并行度

env.setParallelism(2)

val stream1: DataStream[String] = env.socketTextStream("192.168.89.140", 7777)

stream1.print()

env.execute("source stu")

}

}

控制台输出:

5、自定义数据源

通过 addSource() 方法,将实现 SourceFunction 的样例类对象作为参数传入。代码如下:

case class SensorReading(id:String,timestamp:Long,temperature:Double)

object SourceTest {

def main(args: Array[String]): Unit = {

// 创建flink环境

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

// 设置并行度

env.setParallelism(2)

val stream1: DataStream[SensorReading] = env.addSource(new MySensorSource())

// 控制台输出sink

stream1.print()

env.execute("source stu")

}

}

 实现 SourceFunction 的代码如下:

class MySensorSource extends SourceFunction[SensorReading]{

override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {

while (true){

val rand = new Random()

val sensor: SensorReading = SensorReading("sensor_1", System.currentTimeMillis(), rand.nextDouble()*100)

sourceContext.collect(sensor)

Thread.sleep(500)

}

}

override def cancel(): Unit = {

}

}

控制台输出:

相关文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: