龙珠

修炼自己与发现世界

蓄水长流:蓄水池抽样算法(reservoid sampling)

在做微信号“待字闺中”的每日算法题的时候,有提到在总数未知的情况下,做随机抽样使用了蓄水池算法,查了一下,还挺有意思的。

蓄水池抽样算法缘起于这样一道题目:有一个很大的输入流(大到没有存储器可以全部存储下来),而且只能遍历一次,如何从这个输入流中公平的随机抽取m个值(即保证每个元素被选中的概率相等)?(n未知)

刚开始看的时候一头雾水,没有思路。认为既然总数n是未知的,那怎么从中随机抽取呢?但是如果拨开分析的话,会发现:总数n是在过程中未知,在最后输入完毕时是确定的值。因此,算法应该关注于在每获得一个元素的情况下,抽样,使得对于已获得的元素,每个元素被选中的概率是相等的。

这样,就转换为每次得到新输入的元素就进行计算,算法的时间复杂度是O(n)。

我们先考虑m=1的情况:

假设输入流只有一个元素,则这个元素被选中的概率为1.

假设输入流多于一个元素,则第一个元素经过的时候概率为1;之后每一个元素经过的时候,都要考虑是否剔除原来的元素换成新的元素。在第i个元素经过的时候,要以1/i的概率来考虑是否剔除原来的元素,换成stream[i].

我们以stream = [a,b,c]来举例,则最终选取第一个元素(即没有选择第二个、第三个元素)的概率:

P(result = stream[1]) = P(result = a) = 1/1 * (1-1/2) * (1-1/3) = 1/3

同理,可得:

P(result = stream[2]) = P(result = b) = 1/2 * (1-1/3) = 1/3

P(result = stream[3]) = P(result = c) = 1/3

这样,stream中的三个元素a,b,c被选择的概率相等,均为1/3.

推广到一般情况的证明:

若stream[i]被选中,需满足:

1)在stream[i]流过时被选中:

P1 = 1/i

2)在stream[i]之后流过的元素都没有被选中:

P2 = (1-1/(i+1)) * (1-1/(i+2)) * ... * (1-1/n) = i/(i+1) * (i+1)/(i+2) * ... * (n-1)/n = i/n

所以,总的概率是:

P(result = stream[i]) = P1 * P2 = 1/i * i/n = 1/n

得证。


参考资料中的代码如下:

'''reservoir sampling(number of sample = 1)'''
              
def getSample(stream):
              
    i = 0  
              
    while stream.exist(i): #流还没结束
              
        r = random.randrange(0, i)
              
        if r == 1: # P(result = stream[i]) = 1/i
              
            result = stream[i]
              
        i++              
              
    return result

不过,i=0时,random.randrange(0,0)是报错的(Python语言),因此应该修改为:

'''reservoir sampling(number of sample = 1)'''
               
def getSample(stream):
               
    result = stream[0]#初始化
               
    i = 1
               
    while stream.exist(i): #流还没结束
               
        r = random.randrange(0, i+1)
               
        if r == 0: # P(result = stream[i]) = 1/(i+1) 因为计算机中i是从0计数,所以此处为(i+1)
               
            result = stream[i]
               
        i += 1              
               
    return result

同理,在m>1时,需要从n个数中公平的取出m个值,需要考虑集合即可。

思路是:

1)对前m个输入,直接加入到集合result[]中;

2)对之后的第i个输入stream[i],都按照概率m/i进行判断是否采用;不采用则直接pass,采用则随即的对result中m个值中选一个进行替换。

这是我的思路,但在实际操作中,对第2步不用随机两次。只需要判断(0,i)之间的随即数是否在(0,m)区间,在的话,直接用stream[i]替换result[i]即可,无需再对result进行随即。


参考资料中代码如下:

import random
               
                 
               
def getSample(stream, m):
               
    results = []
               
    for i in range(0, m-1):
               
        results[i] = stream[i]
               
    # out of for
               
                 
               
    i = m
               
    while stream[i].available(): # 当前stream[i]有效,流还未完结
               
        r = random.randrange(0, i)
               
        if r < m:
               
            results[r] = stream[i]
               
                 
               
        i++
               
    # out of while
               
                     
               
                 
               
return results

不过,第一步for循环中,应该是对range(0,m)循环,这样才能得到前m个数。

修改后的代码如下:

import random
                
                  
                
def getSample(stream, m):
                
    results = []
                
    for i in range(0, m):
                
        results[i] = stream[i]
                
                  
                
    i = m
                
    while stream[i].available(): # 当前stream[i]有效,流还未完结
                
        r = random.randrange(0, i)
                
        if r < m:
                
            results[r] = stream[i]
                
        i += 1
                
                      
                
return results

数学证明:

选中stream[i]的概率包括:

1)在stream[i]经过时被选中:

P1 = m/i

2)在stream[i]之后的元素,经过的时候都没有被选中,或被选中了,但替换的时候没有替换stream[i]:

我们先计算stream[i+1]不替换stream[i]的概率(没选中 + 选中了但没替换stream[i]):

P(i+1) = (1-m/(i+1)) + m/(i+1) * (m-1)/m = (i+1-m)/(i+1) + (m-1)/(i+1) = i/(i+1)

P(i+2) = ... = (i+1)/(i+2)

...

P(n) = ... = (n-1)/n

总的概率为:

P2 = P(i+1) * P(i+2) * ... * P(n) = i/n

所以,最终选中stream[i]的概率为:

P = P1 * P2 = m/i * i/n = m/n

符合要求。得证。

 

PS:参考资料2、3中,还提到加权分布式蓄水池抽样,需要考虑集合中的数据是有权重的,算法希望数据被抽样选中的概率和该数据的权重成比例。这个等有时间了再看一下。

参考资料:

  1. 蓄水池抽样算法(reservoid sampling)

  2. Reservoir Sampling - Sampling from a stream of elements

  3. 数据工程师必知算法:蓄水池抽样