搞死 MPP 的时空碰撞问题

查看 13|回复 0
作者:jingwei8340885   
问题描述
时空碰撞定义
某时间区间(例如 7 天)被分成多个固定时长(如 15 分钟)的时间切片,对象 a 和对象 b 在同一时间切片内的相同位置出现过,称为一次碰撞。
规则 1:相同时间切片内,多次碰撞只记一次。
规则 2:相同时间切片内,最后出现位置不同的称为不匹配,不匹配的时间切片数量不超过 20 时,(包括其它时间切片的)碰撞才被认为有效。
要求:已知对象 a ,查找出指定时间区间内,满足两条规则下,与 a 发生有效碰撞次数最多的前 20 个对象 b 。
数据结构与规模
单一数据表,每天的数据量约 80 亿条记录,每个对象平均 1000 条记录,每条记录存储对象的时空信息(对象标识、时间戳、空间标记),当时间区间为 7 天时,总数据量有 560 亿行,数据结构如下:
字段名称
字段类型
字段注释
示例数据
no
String
唯一对象标识
100000000009
ct
Int
时间戳,精确到秒
1690819200
lac
String
空间标记 1
40000
ci
String
空间标记 2
66000000
no 由全数字构成。lac 、ci 总是一起出现,为了描述方便起见,我们可以把 lac 和 ci 并称为一个字段 loc ,已知 loc 去重计数的范围不超过 27 万。
环境和期望
在 5 台 64C256G 服务器构成的集群下,期望 1 小时内计算出结果,使用某世界知名 MPP 数据库无法达到预期。
问题分析
这个问题用关系数据库确实不容易快速计算,我们尝试用 SQL 写出不考虑规则 2 的运算:
WITH DT AS ( SELECT DISTINCT no, loc, int(ct/15 分钟) as ct FROM T )
SELECT TOP 20 * FROM
   ( SELECT B.no, COUNT(DISTINCT B.loc) cnt
   FROM DT AS A JOIN DT AS B ON A.loc=B.loc AND A.ct=B.ct
   WHERE A.no=a AND B.noa
   GROUP BY B.no)
ORDER BY cnt DESC
SQL 中的 DISTINCT 和 JOIN 计算会涉及 HASH 和比对,数据量很大时计算量也会很大,都会严重拖累性能。而且这些运算都涉及随机访问,通常要在内存进行,数据量太大还要使用缓存,性能更会急剧下降甚至可能溢出。仅是规则 1 用 SQL 计算已经很慢了,再加上规则 2 ,MPP 算不出来也不奇怪了。
如果把对象 a 、b 在时间区间内的相关记录都取出成内存中的集合,然后来统计 a 和 b 发生有效碰撞的次数,并不会很困难。每个对象涉及的记录数并不多,即使 7 天区间也不到 1 万条,内存放下毫无压力。
设 a 的记录集合是 A ,b 的是 B ,将 A 按时间切片分组为 A1,…,An ,B 分为 B1,…Bn 。所有 Ai,Bi 内成员都按 ct 从小到大排序。
时间切片 i 内,a,b 发生(不考虑两条规则时的)碰撞的次数,可用
Ci=Bi.count(Ai.(loc).contain(loc))
计算出来,即统计 Bi 中有多少 loc 在 Ai 中出现过。
不过,这种两层循环计算会较慢,而我们知道 a 以及 Ai 相对于 b 是确定的,这样可以事先对 Ai 中的 loc 去重后建索引,改为
Ai’=Ai.id(loc).key@i(loc)
Ci=Bi.switch@i(loc,Ai’).len()
用 switch@i 过滤掉在 Ai 中找不到 loc 的 Bi 成员,同样可以得到碰撞次数。
我们只要统计 Ci>0 的时间切片个数即可得到满足规则 1 的碰撞次数。
类似地,可用
Di=Ai.m(-1).loc!=Bi.m(-1).loc //m(-1)表示取集合的最后成员
判断出在时间切片 i 中 a,b 是否发生过不匹配。
有了 Ci,Di ,a,b 的有效碰撞次数就很容易计算了
if(count(Di)0))
剩下就是针对该值计算 TopN 的常规任务了。
如果数据对 no,ct 有序,也很容易实现这个思路。A 可以用二分法一把取出,然后从头遍历对象 b ,因为数据有序,每次取出对应的 B 很容易。A 和 B 都对 ct 有序时,可以用有序分组计算出 Ai,Bi ,且保证上述 m(-1)的正确性。
可惜关系数据库无法保证数据有序存储,也没有相关的有序计算方法,只能写出非常绕的嵌套 SQL 。
SPL 有这种有序存储和相关的计算机制,容易实现。
基于这个思路,还有一些工程上的优化手段。
数据转换
将 no 变成数,两个位置 lac 、ci 合并成一个 loc ,并且序号化(原来是字符串,数字化时就顺便处理为序号了)。
转换后的数据结构如下
字段名称
数据类型
字段含义
示例数据
no
Long
唯一对象标识
100000000009
ct
Int
时间戳,精确到秒
1690819200
loc
Int
空间标记
10282
相比原数据结构,转存时做了以下两点变动:
1 、将 lac 、ci 两个字段合并为 loc 字段,并转换成 Int 型序号。原 lac 、ci 作为维表单独存储。
2 、将数字串 no 的数据类型变为 Long 型整数。
关联与序号化
前面分析中提到的每个时间切片的 Ai 建索引,但 Ai 太小了(平均长度在 10 左右),对于过小的集合使用索引的效果不明显。所以,我们在工程上改造成对整个 A (长度约有 1000 )建索引,这样要把时间切片序号 i 也加到主键上,大致代码:
A’=A.derive((ct-st)\900:i).groups(i,loc).index()
其中 st 是时间区间的起点,即每 900 秒分出一个时间切片。
这时 Ci 的计算要变成先关联(过滤)再分组了:
B.derive((ct-st)\900:i).join@i(i:loc,A).groups(i;count(1):C)
这样就可以计算出以 i 和 Ci 为字段的序表,未碰撞的情况被 join@i 过滤掉了。
join@i 使用索引实现关联过滤时,还是要计算 HASH 并比对,仍然有一定的计算量。其实我们知道,全部 i,loc 组合最多有 7 天96 (每天 96 个 15 分钟)27 万种可能,这并不是很大。如果用一个布尔值数组(序列)表示 A 在各个时间切片中是否在某个 loc 出现过,其长度最多也就是 79627 万,内存完全可以装得下。这样,我们就可以用对位序列技术来实现关联过滤,避免 HASH 计算和比对时间,能更快速地计算 Ci 。
用 aloc 表示 A 的对位序列:
aloc=A.align@a(672,(ct-st)\900+1).(x=270000.(false),~.run(x(loc)=true),x)
因为有时间切片和位置两个维度,这里也使用了二层的对位序列。将 A 按时间切片分成 672 ( 7*96 )个 组,每组是个 27 万个布尔值成员的序列,对于时间切片 i 中在位置 loc 出现过的对象,可以简单地用 aloc(i)(loc)迅速判断出是否与 a 发生过碰撞(即 a 是否也在时间切片 i 中在位置 loc 出现过)。
a 在每个时间切片的最后位置,也可以用一个序列表示为:
alast=A.align@a(672,(ct-st)\900+1).(~.m(-1).loc)
alast(i)就是 a 在时间切片 i 的最后位置,同样可以简单地用时间切片序号访问,以便快速计算 Di 。
按天分表
以上讲的算法要求数据对 no,ct 有序。但数据每天会新增,新增数据通常只会对 ct 有序甚至彻底无序。如果每次都要所有数据大排序就非常慢,即使只把新增数据排序再归并也要重写 560 亿行的数据,过于耗时。
SPL 复组表可以将多个有序的组表逻辑上合并成一个更大的有序组表,这样每天一个分组表存储,计算时用复组表归并分表数据,归并后的数据也可以支持并行计算。避免了全量数据每天重写,复组表读取时会损失少量归并时间,但获得数据维护的灵活性还是值得的。
当历史数据过期时,直接将相应日期的分表文件删除就可以了,非常简单。
实践过程
准备实验数据
将数据按天存储,每天内数据 no 、ct 有序,保存为列存组表,例如将 7 天数据,分别存为:1.day.ctx,…,7.day.ctx ,由这 7 个分表可构成复组表,造数据脚本可以这样写:
A
B
C
1
=rand@s(1)
2
for n
=file("day"/A2/".btx")
3
=movefile(B2)
4
=elapse@s(sd,(A2-1)*86400)
5
=long(B4)\1000
6
for nm
=1000000.new(100000000000+rand(8000000):no,int(B5+rand(86400)):ct,int(rand(270000)+1):loc)
7
=B2.export@ab(C6)
8
=file(A2/".day.ctx").create@py(#no,#ct,loc)
9
=B2.cursor@b().sortx(#1,#2)
10
>B8.append@i(B9)
11
=movefile(B2)
参数值有 3 个:
1 、n ,几天,举例:1 ,代表 1 天
2 、nm ,每天几百万,举例:1000 ,代表 10 亿
3 、sd ,起始日期,举例:2023-08-01
B8 建立组表时用了 @p 选项,表示按第一个字段 no 作为分段键。并行计算时需要对组表分段,不能把相同 no 的记录分到两段,使用 @p 选项可以在组表分段时保证这一点。
计算脚本
A
1
=now()
2
270000
3
=n*24*3600\pt
4
=file("day.ctx":to(n)).open()
5
=A4.cursor@m(ct,loc;no==src_no).fetch().align@a(A3,(ct-st)\pt+1)
6
=alast=A5.(~.m(-1).loc)
7
=aloc=A5.(x=A2.(false),~.run(x(loc)=true),x)
8
=A4.cursor@m(;no!=src_no).derive((ct-st)\pt+1:tn,aloc(tn)(loc):loca,alast(tn):lasta)
9
=A8.group@s(no,tn;lasta,count(loca):cnt,top@1(-1,0,loc):lastb)
10
=A9.group@s(no;count(cnt>0):cnt,count(lasta && lastb && lastb!=lasta):dcnt)
11
=A10.select(cnt>0 && dcnt参数值有 4 个:
1 、src_no 为对象 a 的特征号,举例:100000000009
2 、st 为起始时间戳(秒),举例:1690819200 ,对应 2023-08-01 00:00:00
3 、n 为统计天数,举例 7
4 、pt 为切片时间的秒数,举例 900
A3:为统计时间区间内的总时间切片数
A5:读出对象 a 的数据,产生时间切片序号并按该序号分组。组表对 no 有序时,用 no==src_no 的条件可以迅速定位到目标数据。
A6:基于 A5 计算 a 在每个时间切片的最后空间值
A7:基于 A5 计算 a 在对位序列,前面已经解释过计算原理
A8:遍历其他(非 a 的)对象,生成时间切片序号 tn (使用新符号与 a 区别)。对于第一记录,从 aloc 中取出当前对象是否在时间切片 tn 和位置 loc 上和 a 发生碰撞记入 loca ,从 alast 中取出时间切片 tn 中对象 a 的最后空间值。
A9:按对象和时间切片分组,可以用 lasta 计算每个对象在时间切片中与 a 的碰撞次数 cnt ,即前面分析的 Ci ;并计算出该对象在该时间切片中最后的 loc 记为 lastb 。
A10:进一步按对象分组,计算出该对象与 a 的(考虑规则 1 后的)碰撞次数和不匹配次数。每个时间切片中的 Ci>0 即认定为一次碰撞,所以是 count(cnt>0),记入新的 cnt ;最后位置不同时计算一次不匹配,记入 dcnt 。
A11:过滤掉无效碰撞的对象后取有效碰撞次数最多的前 20 名。这里做实验时采用的条件 dcnt
编号化及还原
以上代码在造数据时,是按 no 已经整数化,且 lac,ci 被合为序号来写的,实际上先要做转换整理,完成计算后还要还原。具体介绍可以参考:数据转存时的整数化
实验效果
SPL 使用单机( 8C64G ),计算总时间跨度 7 天(总数据量有 560 亿行),时间切片为 15 分钟,耗时为 121 秒。
实际上,达到这个性能还会少量使用 SPL 企业版中的列式运算选项,但因为不涉及原理分析,这里就不详述了。
后记
这是个典型的对象统计问题,这类问题一般有如下几个特点:
[ol]

  • 统计满足某种条件的对象的个数

  • 对象的数量非常多,但每个对象涉及的数据量不多

  • 条件非常复杂,通常还和次序有关,需要一定的步骤才能判断出来
    [/ol]
    面对这类问题,一个常见的思路就是把数据按对象排序,逐步取出每个对象的数据进入内存,再来做复杂的条件判断。
    现实中这种运算很常用,银行的帐户统计、电商的用户漏斗分析等等都是这种运算。
    SQL 很难实现这种运算,不保证有序存储,也缺乏有序运算,也很难写这些复杂判断。经常要写成很绕的嵌套语句,或者使用存储过程,无论如何,执行性能都会很差。
    SPL 则提供了有序存储及有序计算,也支持复杂的过程计算,能够很方便实现这类统计。
    SPL 资料
  • SPL 下载
  • SPL 源代码
  • 您需要登录后才可以回帖 登录 | 立即注册

    返回顶部