蓄水长流:蓄水池抽样算法(reservoid sampling)
在做微信号“待字闺中”的每日算法题的时候,有提到在总数未知的情况下,做随机抽样使用了蓄水池算法,查了一下,还挺有意思的。
蓄水池抽样算法缘起于这样一道题目:有一个很大的输入流(大到没有存储器可以全部存储下来),而且只能遍历一次,如何从这个输入流中公平的随机抽取m个值(即保证每个元素被选中的概率相等)?(n未知)
刚开始看的时候一头雾水,没有思路。认为既然总数n是未知的,那怎么从中随机抽取呢?但是如果拨开分析的话,会发现:总数n是在过程中未知,在最后输入完毕时是确定的值。因此,算法应该关注于在每获得一个元素的情况下,抽样,使得对于已获得的元素,每个元素被选中的概率是相等的。
这样,就转换为每次得到新输入的元素就进行计算,算法的时间复杂度是O(n)。
我们先考虑m=1的情况:
假设输入流只有一个元素,则这个元素被选中的概率为1.
假设输入流多于一个元素,则第一个元素经过的时候概率为1;之后每一个元素经过的时候,都要考虑是否剔除原来的元素换成新的元素。在第i个元素经过的时候,要以1/i的概率来考虑是否剔除原来的元素,换成stream[i].
我们以stream = [a,b,c]来举例,则最终选取第一个元素(即没有选择第二个、第三个元素)的概率:
P(result = stream[1]) = P(result = a) = 1/1 * (1-1/2) * (1-1/3) = 1/3
同理,可得:
P(result = stream[2]) = P(result = b) = 1/2 * (1-1/3) = 1/3
P(result = stream[3]) = P(result = c) = 1/3
这样,stream中的三个元素a,b,c被选择的概率相等,均为1/3.
推广到一般情况的证明:
若stream[i]被选中,需满足:
1)在stream[i]流过时被选中:
P1 = 1/i
2)在stream[i]之后流过的元素都没有被选中:
P2 = (1-1/(i+1)) * (1-1/(i+2)) * ... * (1-1/n) = i/(i+1) * (i+1)/(i+2) * ... * (n-1)/n = i/n
所以,总的概率是:
P(result = stream[i]) = P1 * P2 = 1/i * i/n = 1/n
得证。
参考资料中的代码如下:
'''reservoir sampling(number of sample = 1)''' def getSample(stream): i = 0 while stream.exist(i): #流还没结束 r = random.randrange(0, i) if r == 1: # P(result = stream[i]) = 1/i result = stream[i] i++ return result
不过,i=0时,random.randrange(0,0)是报错的(Python语言),因此应该修改为:
'''reservoir sampling(number of sample = 1)''' def getSample(stream): result = stream[0]#初始化 i = 1 while stream.exist(i): #流还没结束 r = random.randrange(0, i+1) if r == 0: # P(result = stream[i]) = 1/(i+1) 因为计算机中i是从0计数,所以此处为(i+1) result = stream[i] i += 1 return result
同理,在m>1时,需要从n个数中公平的取出m个值,需要考虑集合即可。
思路是:
1)对前m个输入,直接加入到集合result[]中;
2)对之后的第i个输入stream[i],都按照概率m/i进行判断是否采用;不采用则直接pass,采用则随即的对result中m个值中选一个进行替换。
这是我的思路,但在实际操作中,对第2步不用随机两次。只需要判断(0,i)之间的随即数是否在(0,m)区间,在的话,直接用stream[i]替换result[i]即可,无需再对result进行随即。
参考资料中代码如下:
import random def getSample(stream, m): results = [] for i in range(0, m-1): results[i] = stream[i] # out of for i = m while stream[i].available(): # 当前stream[i]有效,流还未完结 r = random.randrange(0, i) if r < m: results[r] = stream[i] i++ # out of while return results
不过,第一步for循环中,应该是对range(0,m)循环,这样才能得到前m个数。
修改后的代码如下:
import random def getSample(stream, m): results = [] for i in range(0, m): results[i] = stream[i] i = m while stream[i].available(): # 当前stream[i]有效,流还未完结 r = random.randrange(0, i) if r < m: results[r] = stream[i] i += 1 return results
数学证明:
选中stream[i]的概率包括:
1)在stream[i]经过时被选中:
P1 = m/i
2)在stream[i]之后的元素,经过的时候都没有被选中,或被选中了,但替换的时候没有替换stream[i]:
我们先计算stream[i+1]不替换stream[i]的概率(没选中 + 选中了但没替换stream[i]):
P(i+1) = (1-m/(i+1)) + m/(i+1) * (m-1)/m = (i+1-m)/(i+1) + (m-1)/(i+1) = i/(i+1)
P(i+2) = ... = (i+1)/(i+2)
...
P(n) = ... = (n-1)/n
总的概率为:
P2 = P(i+1) * P(i+2) * ... * P(n) = i/n
所以,最终选中stream[i]的概率为:
P = P1 * P2 = m/i * i/n = m/n
符合要求。得证。
PS:参考资料2、3中,还提到加权分布式蓄水池抽样,需要考虑集合中的数据是有权重的,算法希望数据被抽样选中的概率和该数据的权重成比例。这个等有时间了再看一下。
参考资料: