flink 单流 kafka 写入 mysqld
配置参考
安装kafka
进入容器
docker exec -it kafka /bin/bash
cd opt/kafka
创建一个主题
bin/kafka-topics.sh --create --zookeeper 192.168.43.50:2181 --replication-factor 1 --partitions 1 --topic flink_test
启动flink sql client
bin/sql-client.sh embedded
sink mysql 创建语句
CREATE TABLE test_result (
  `day_time` varchar(64) NOT NULL,
  `total_amount` bigint(11) DEFAULT NULL,
  PRIMARY KEY (`day_time`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
配置语句
create table kafka_source ( 
  id BIGINT,
  day_time VARCHAR,
  amnount BIGINT,
  proctime AS PROCTIME ()
)
 with ( 
  'connector' = 'kafka',
  'topic' = 'flink_test',
  'properties.bootstrap.servers' = '192.168.43.50:9092', 
  'properties.group.id' = 'flink_gp_test1',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true',
  'properties.zookeeper.connect' = '192.168.43.50:2181/kafka'
 );
CREATE TABLE mysql_sink (
                   day_time string,
                   total_amount bigint,
                   PRIMARY KEY (day_time) NOT ENFORCED
 ) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://192.168.43.50:3306/test?characterEncoding=UTF-8',
   'table-name' = 'test_result',
   'username' = 'root',
   'password' = '123456'
 );
INSERT INTO mysql_sink
SELECT day_time,SUM(amnount) AS total_amount
FROM kafka_source
GROUP BY day_time;
运行一个生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink_test
发送数据
{"day_time": "20211001","id": 7,"amnount":2}
{"day_time": "20211002","id": 7,"amnount":20}
{"day_time": "20211003","id": 7,"amnount":30}
{"day_time": "20211004","id": 7,"amnount":40}
{"day_time": "20211005","id": 7,"amnount":50}
{"day_time": "20211006","id": 7,"amnount":60}
{"day_time": "20211007","id": 7,"amnount":70}
{"day_time": "20211008","id": 7,"amnount":80}
{"day_time": "20211009","id": 7,"amnount":90}
{"day_time": "20211010","id": 7,"amnount":20}
{"day_time": "20211010","id": 7,"amnount":30}
{"day_time": "20211010","id": 7,"amnount":40}
mysql test_result 表数据
20211001    2
20211002    20
20211003    30
20211004    40
20211005    50
20211006    60
20211007    70
20211008    80
20211009    90
20211010    50
                        
                            0
                        
                        
                             赞
                        
                    
                    
                热门推荐
- 
                        2、 - 优质文章
- 
                        3、 gate.io
- 
                        8、 openharmony
- 
                        9、 golang