目录

概述实践paimon

结束

概述

ogg kafka paimon

实践

前置准备请看如下文章

文章链接hadoop一主三从安装链接spark on yarn链接flink的yarn-session环境链接

paimon

目标:

1.同步表2.能过 kafka 向 paimon写入

SET parallelism.default =2;

set table.exec.sink.not-null-enforcer = drop;

SET jobmanager.memory.process.size = 2048m;

SET taskmanager.memory.process.size = 10240m;

SET taskmanager.memory.managed.size = 128m;

CREATE CATALOG paimon WITH (

'type' = 'paimon',

'warehouse' = 'hdfs:///data/paimon',

'default-database'='ods'

);

USE CATALOG paimon;

DROP TABLE IF EXISTS xx_REFDES_KAFKA;

CREATE TEMPORARY xx_REFDES_KAFKA

( `PCBID` STRING

,`RID` STRING

,`REFDES` STRING

,`BM_CIRCUIT_NO` DECIMAL(38,0)

,`TIMESTAMP` TIMESTAMP

,`PICKUPSTATUS` STRING

,`SERIAL_NUMBER` STRING

,`FLAG` DECIMAL(2,0)

,`KITID` STRING

,`ID` STRING

,`CREATEDATE` TIMESTAMP

,`ETL` STRING COMMENT 'etl标识'

,`OPT1` STRING COMMENT '备用1'

,`OPT2` STRING COMMENT '备用2'

,`OPT3` STRING COMMENT '备用3'

,`OPT4` STRING COMMENT '备用4'

,`OPT5` STRING COMMENT '备用5'

,`NOZZLEID` STRING COMMENT 'nxt的NOZZLEID'

,`LANENO` STRING COMMENT 'nxt的LANENO'

,`COMPONENTBARCODE` STRING COMMENT 'asm的componentBarcode nxt的Part2DCode'

,`PN` STRING

,`LOTCODE` STRING

,`DATECODE` STRING

,`VERDOR` STRING

,`WORKORDER` STRING

,primary key(ID) not enforced)

WITH(

'connector' = 'kafka',

'topic' = 'TRACE_LOG_REFDES',

'properties.bootstrap.servers' = '10.xx.xx.30:9092',

'properties.group.id' = 'xx_REFDES_GROUP',

'scan.startup.mode' = 'earliest-offset',

'format' = 'ogg-json'

);

create table if not exists yy_refdes_hive_ro

( `pcbid` string

,`rid` string

,`refdes` string

,`bm_circuit_no` decimal(38,0)

,`timestamp` string COMMENT '{"type":"DATE"}'

,`pickupstatus` string

,`serial_number` string

,`flag` decimal(2,0)

,`kitid` string

,`id` string

,`createdate` string COMMENT '{"type":"DATE"}'

,`etl` string comment 'etl标识'

,`opt1` string comment '备用1'

,`opt2` string comment '备用2'

,`opt3` string comment '备用3'

,`opt4` string comment '备用4'

,`opt5` string comment '备用5'

,`nozzleid` string comment 'nxt的nozzleid'

,`laneno` string comment 'nxt的laneno'

,`componentbarcode` string comment 'asm的componentbarcode nxt的part2dcode'

,`pn` string

,`lotcode` string

,`datecode` string

,`verdor` string

,`workorder` string

,`dt` string

,primary key (id,dt) not enforced)

partitioned by (dt) with (

'connector' = 'paimon',

'file.format' = 'parquet',

'metastore.partitioned-table' = 'true',

'bucket' = '-1',

'partition.expiration-time' = '730 d',

'partition.expiration-check-interval' = '1 d',

'partition.timestamp-formatter' = 'yyyy-MM-dd',

'partition.timestamp-pattern' = '$dt'

);

INSERT INTO yy_refdes_hive_ro SELECT

PCBID,RID,REFDES,BM_CIRCUIT_NO,DATE_FORMAT(`TIMESTAMP`,'yyyy-MM-dd HH:mm:ss'),PICKUPSTATUS,SERIAL_NUMBER,FLAG,KITID,ID,DATE_FORMAT(CREATEDATE,'yyyy-MM-dd HH:mm:ss'),ETL,OPT1,OPT2,OPT3,OPT4,OPT5,NOZZLEID,LANENO,COMPONENTBARCODE,PN,LOTCODE,DATECODE,VERDOR,WORKORDER,DATE_FORMAT(`TIMESTAMP`,'yyyy-MM-dd')

FROM xx_REFDES_KAFKA;

结束

ogg数据通过 flink 写入 paimon至此结束。

参考链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: