目录
1、从集合中加载数据源
2、从文件中读取数据
3、从 Kafka 中读取数据
4、监控窗口加载数据:
5、自定义数据源
1、从集合中加载数据源
使用 fromCollection() 方法将数据放入list中作为Flink的数据源
方式一:
def main(args: Array[String]): Unit = {
// 创建flink环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 设置并行度
env.setParallelism(2)
var dataList = List(
1,2,3,4,5,21,12,3445,65
)
// 加载数据源1 source 方式一
val stream1: DataStream[Int] = env.fromCollection(dataList)
// 控制台输出sink
stream1.print()
env.execute("source stu")
}
}
控制台输出:
方式二:
创建样例类,使用 fromCollection() 方法将数据放入Seq序列中作为Flink的数据源
case class SensorReading(id:String,timestamp:Long,temperature:Double)
object SourceTest {
def main(args: Array[String]): Unit = {
// 创建flink环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 设置并行度
env.setParallelism(2)
var dataList2 = List(
SensorReading("sensor_1",1665103420,38.1),
SensorReading("sensor_4",1665103420,31.2),
SensorReading("sensor_7",1665103420,15.4),
SensorReading("sensor_9",1665103420,25.8)
)
// 加载数据源1
val stream1: DataStream[SensorReading] = env.fromCollection(dataList2)
// 控制台输出sink
stream1.print()
env.execute("source stu")
}
}
控制台输出:
还可以通过调用 fromElements() 方法将数据直接输入:
object SourceTest {
def main(args: Array[String]): Unit = {
// 创建flink环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 设置并行度
env.setParallelism(2)
val stream1: DataStream[Any] = env.fromElements("hello", 1, 1.5, "hello gogo")
// 控制台输出sink
stream1.print()
env.execute("source stu")
}
}
控制台输出:
2、从文件中读取数据
readTextFile()
通过 readTextFile() 方法可以将本地或hdfs上的文件作为数据源读入Flink
"E:\\Study_Project\\flink\\resource\\sensor.txt" 路径下存在文件:
object SourceTest {
def main(args: Array[String]): Unit = {
// 创建flink环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 设置并行度
env.setParallelism(2)
//加载数据源3--------------
var path="E:\\Study_Project\\flink\\resource\\sensor.txt"
val stream1: DataStream[String] = env.readTextFile(path)
// 控制台输出sink
stream1.print()
env.execute("source stu")
}
}
控制台输出:
3、从 Kafka 中读取数据
通过 addSource() 方法加载kafka中的数据
object SourceTest {
def main(args: Array[String]): Unit = {
// 创建flink环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 设置并行度
env.setParallelism(2)
val properties = new Properties()
properties.setProperty("bootstrap.servers","192.168.89.140:9092")
properties.setProperty("group_id","sensor_group")
properties.setProperty("auto.offset.reset","latest")
val stream1: DataStream[String] =
env.addSource(new FlinkKafkaConsumer[String]("sensor", new SimpleStringSchema(), properties))
// 控制台输出sink
stream1.print()
env.execute("source stu")
}
}
kafka客户端topic-sensor下生产数据:
控制台输出:
4、监控窗口加载数据:
linux下开启监控窗口:
nc -lk 7777 在服务器端启动端口
object SourceTest {
def main(args: Array[String]): Unit = {
// 创建flink环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 设置并行度
env.setParallelism(2)
val stream1: DataStream[String] = env.socketTextStream("192.168.89.140", 7777)
stream1.print()
env.execute("source stu")
}
}
控制台输出:
5、自定义数据源
通过 addSource() 方法,将实现 SourceFunction 的样例类对象作为参数传入。代码如下:
case class SensorReading(id:String,timestamp:Long,temperature:Double)
object SourceTest {
def main(args: Array[String]): Unit = {
// 创建flink环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 设置并行度
env.setParallelism(2)
val stream1: DataStream[SensorReading] = env.addSource(new MySensorSource())
// 控制台输出sink
stream1.print()
env.execute("source stu")
}
}
实现 SourceFunction 的代码如下:
class MySensorSource extends SourceFunction[SensorReading]{
override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
while (true){
val rand = new Random()
val sensor: SensorReading = SensorReading("sensor_1", System.currentTimeMillis(), rand.nextDouble()*100)
sourceContext.collect(sensor)
Thread.sleep(500)
}
}
override def cancel(): Unit = {
}
}
控制台输出:
相关文章
发表评论