Flink SQL 官网用客户(customer)和订单(order)举例,但都每分钟统计流表每个客户订单的数量。我的需求是每分钟统计维表全量每个客户订单的数量,也就是就算这一分钟某个客户没有下单,也需要统计一个 0 出来,用于做监控报警。
为了不暴露业务需求,调整为客户和订单的场景,如果有不恰当的地方还请指出,我再补充,SQL 如下:
CREATE TEMPORARY TABLE customers (
id INT,
name STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://....'
);
CREATE TEMPORARY TABLE orders (
order_id STRING,
customer_id INT,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '15' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '...'
);
CREATE TEMPORARY VIEW order_per_minute AS
SELECT
customer_id,
count(*) as cnt,
TUMBLE_END(order_time, INTERVAL '1' MINUTE) AS window_end
FROM orders
GROUP BY customer_id, TUMBLE(tstamp, INTERVAL '1' MINUTE);
INSERT INTO destination
SELECT
COALESCE(window_end, CURRENT_TIMESTAMP),
customer_id,
COALESCE(cnt, 0),
FROM
customers LEFT JOIN order_per_minute
ON customers.id = order_per_minute.customer_id;
实际执行上面的代码有问题,比如说有 3 个客户 c1/c2/c3 ,但只有 2 个客户 c1/c2 每分钟都下单,
第一次执行结果是对的:
10:01, c1, 19
10:01, c2, 32
10:01, c3, 0
随后每分钟的数据,就会少掉 c3 的结果:
10:02, c1, 18
10:02, c2, 22 // c3 没有输出
10:03, c1, 18
10:03, c2, 22 // c3 没有输出
我也不清楚 Flink SQL 能否这么用吗,还是得用 DataStream API 解决?请论坛的 Flink 大佬帮忙看一下,感谢!
为了防止 customer 、order 场景误导,这里说一下实际场景:
实际是要监控 IoT 设备的数据推送,IoT 设备全量表是预先初始化好的,保存在 MySQL 数据表; IoT 数据推送是一个 kafka 流消息,理论上每个 IoT 设备每秒都有数据。需求是监控哪些 IoT 设备数据是中断或缺失的。