flink 单流 kafka 写入 mysqld

  • 2021-07-06
  • 浏览 (237)

配置参考

jdbc
kafka

安装kafka

docker 安装 kafka

进入容器

docker exec -it kafka /bin/bash
cd opt/kafka

创建一个主题

bin/kafka-topics.sh --create --zookeeper 192.168.43.50:2181 --replication-factor 1 --partitions 1 --topic flink_test
bin/sql-client.sh embedded

sink mysql 创建语句

CREATE TABLE test_result (
  `day_time` varchar(64) NOT NULL,
  `total_amount` bigint(11) DEFAULT NULL,
  PRIMARY KEY (`day_time`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;

配置语句

create table kafka_source ( 
  id BIGINT,
  day_time VARCHAR,
  amnount BIGINT,
  proctime AS PROCTIME ()
)
 with ( 
  'connector' = 'kafka',
  'topic' = 'flink_test',
  'properties.bootstrap.servers' = '192.168.43.50:9092', 
  'properties.group.id' = 'flink_gp_test1',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true',
  'properties.zookeeper.connect' = '192.168.43.50:2181/kafka'
 );

CREATE TABLE mysql_sink (
                   day_time string,
                   total_amount bigint,
                   PRIMARY KEY (day_time) NOT ENFORCED
 ) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://192.168.43.50:3306/test?characterEncoding=UTF-8',
   'table-name' = 'test_result',
   'username' = 'root',
   'password' = '123456'
 );

INSERT INTO mysql_sink
SELECT day_time,SUM(amnount) AS total_amount
FROM kafka_source
GROUP BY day_time;

运行一个生产者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink_test

发送数据

{"day_time": "20211001","id": 7,"amnount":2}
{"day_time": "20211002","id": 7,"amnount":20}
{"day_time": "20211003","id": 7,"amnount":30}
{"day_time": "20211004","id": 7,"amnount":40}
{"day_time": "20211005","id": 7,"amnount":50}
{"day_time": "20211006","id": 7,"amnount":60}
{"day_time": "20211007","id": 7,"amnount":70}
{"day_time": "20211008","id": 7,"amnount":80}
{"day_time": "20211009","id": 7,"amnount":90}
{"day_time": "20211010","id": 7,"amnount":20}
{"day_time": "20211010","id": 7,"amnount":30}
{"day_time": "20211010","id": 7,"amnount":40}

mysql test_result 表数据

20211001    2
20211002    20
20211003    30
20211004    40
20211005    50
20211006    60
20211007    70
20211008    80
20211009    90
20211010    50
正文到此结束