写在前面 Bloom Filter布隆过滤器
工作原理是什么?用的时候我们需要注意什么?为什么?这篇文章我们来讲讲,并用go语言来实现布隆过滤器,这也是面试会经常考察的点。代码链接在末尾的github上。
1. 工作原理 1.1 组成部分 布隆过滤器的核心是 快速过滤
, 在布隆过滤器里面的不一定在,而不在布隆过滤器中的一定不在。
我们看下面的这个原理图:
我们拆分组成部分:
X,Y和Z,其中X,Y这两个值是存入布隆过滤器中,但是Z没有存入布隆过滤器中 hash1函数、hash2函数、hash3函数等多个hash函数
,数量为 K
的hash函数,用来确定某个值在布隆过滤器中的位置 布隆过滤器的数组长度,一般使用 bitset
来存储,长度为 M
的数组。 我们知道布隆过滤器是两种结果:
1.2 为什么是可能在集合中? 首先我们先确认一点是,X是怎么存入布隆过滤器中的呢? X和多个hash函数进行计算,并确定好了这个X是落在布隆过滤器的数组的哪个下标中。
如果落在哪个下标,哪个下标就变成1。
我们再来看看怎么判断X是不是在过滤器中的呢?其实就是和存入布隆过滤器器的计算一样,判断这些位置上的是不是都是1就行了。比如下图 但这里的 可能
是什么意思呢?其实我们可以看到 判断存在与否的本质是数组的0,1,3下表是不是1
, 如果这三个位置是1,那么就 可能
是X当初存入的时候将数组的这三个位置变成1的,当然这里是可能, 因为这三个位置也可能是其他元素存入的时候变成1的
。例如下面 假设X的计算会落在0,1,3中,但是这三个位置是 其他元素存入时占出来的
,是存储A、B、C的时候占出来的。所以我们只能判断出, 这个X可能是存在布隆过滤器中
,并不是绝对的。而这就是误判,所以在布隆过滤器中会存在一个 误判率
,而这个误判率是提前设定的,比如设置为0.01,也就是1%的误判率。
那是不是有一种可能,如果我们的hash函数够多,计算出X的落点下标就会越离散,就可能会让我们的数据分布均匀,上面的例子是3个hash函数的结果, 如果是4个hash函数,那么X的落点可能是0,1,3,6,那么这就可以判断出X一定不存在集合中。
那实际中, 我们应该选择多少个hash函数呢?过多的hash函数会导致计算时长变得过长
,这个我们后面再说。
1.3 为什么是一定不在集合中? 还是上面的例子,但是此时X的计算是落在下标0,1,4中的,而 此时4这个坑位没有占用
,那可以证明,X一定没存入过布隆过滤器中。 如果这里面的坑位都占满了呢?那这就涉及到另一个变量,我们需要M是多大的数组呢? 这就要看我们所需要存储的元素数量,根据元素的数量和所期望的误差率来计算所需要存储的数组长度M。
1.3 如何确定M和K? 我们知道误判是一定会存在的,那我们 假设业务能接受的误差率为p,存储元素个数为n个
,那么可以根据以下的公式算出布隆过滤器中的数组长度M值:
接着我们根据以下公式,算出在存储n个元素的同时需要达到误差率p,需要的hash函数是多少个?
K的计算公式 其实我们结合这两个公式,将M的计算套在K的计算公式上, 我们可以看出K个hash函数和误判率p的关系
。
我们的K一定是大于0的,并且是有限数的,我们可以画出误判率p和K个hash函数的关系图。
所以当我们的K大于某个值的时候,误差率之间其实已经很小很小了。
而K过大就会导致另一个问题,那就是耗时加长了,因为hash运算也是需要时间的,我们每次判断都需要对这个数进行K次的hash函数计算, 我们为什么选择布隆过滤器,不就是因为布隆过滤器可以快速判断某个元素是不是包含在布隆过滤器的集合中吗?
所以我们必须提前知道我们K应该是设置成多少? 才能将hash运算的时间达到一个可控的预期时间
。如果hash时间过长,其实满足不了我们 快速判断
的初衷。
k和hash函数对应某个误判率p 这时候大家可能会一个疑惑,那我们一开始不就设定了误差是0.01吗?
问题来了,如果0.01的误差,需要8个hash函数, 也就是每次存储一个数就要进行8次hash函数的计算,这对cpu来说是一个巨大的挑战
,也对耗时是个巨大的损耗。但如果误差0.02是需要只4次hash,0.025只需要1次hash。你会怎么选择? 我肯定会选择 K=1 这种,因为误判率小于3%,业务都能接受,这过滤了97.5%的正确率,这是一个非常不错的成绩了,而且还是用最快的速度达到97%。
当然,实际上并不会这么理想,我们用0.01的误判率去算出K和M,然后这个K和M再去算真实的计算 ,这个值很大概率不会是0.01,这就需要我们进行调试了。不断用K去调试出符合业务的误判率。
2. 实现 我们了解完布隆过滤器的实现原理后,我们来代码实现一下。
2.1 结构定义 这次我们使用bitset包是第三方的 github.com/bits-and-blooms/bitset
这个包,而hash函数用的是murmur hash,这就是这个第三方 github.com/twmb/murmur3
type BloomFilter struct { bitset *bitset.BitSet // 使用第三方bitset包 size uint64 // 位集合大小 hashFuncs []hash.Hash64 // 哈希函数集合 mutex sync.RWMutex // 读写锁 }
// NewWithFalsePositiveRate 根据预期元素数量和误判率创建布隆过滤器 func NewWithFalsePositiveRate (expectedItems uint64 , falsePositiveRate float64 ) * BloomFilter { // 计算最优位数组大小和哈希函数数量 m, k := optimalParameters(expectedItems, falsePositiveRate) funcs := make ([]hash.Hash64, k) for i := 0 ; i < k; i++ { // 创建hash集合 funcs[i] = murmur3.SeedNew64( uint64 (i)) } return &BloomFilter{ bitset: bitset.New( uint (m)), size: m, hashFuncs: funcs, mutex: sync.RWMutex{}, } }
在创建hash集合这里,我们需要注意一点, hash集合必须是不同的,否则将会毫无意义,甚至出现副作用
。因为我们对同一个数进行多次相同的hash,结果其实是一样的,但是浪费了时间。
func optimalParameters (n uint64 , p float64 ) ( uint64 , int ) { m := uint64 (math.Ceil(- float64 (n) * math.Log(p) / (math.Ln2 * math.Ln2))) k := int (math.Ceil(math.Ln2 * float64 (m) / float64 (n))) return m, k }
2.2 基础逻辑实现 将元素添加到布隆过滤器的集合中:计算hash结果,去set对应的下标位置。 func (bf *BloomFilter) Add (item [] byte ) { bf.mutex.Lock() defer bf.mutex.Unlock() for _, h := range bf.hashFuncs { h.Reset() _, _ = h.Write(item) index := h.Sum64() % bf.size bf.bitset.Set( uint (index)) } }
func (bf *BloomFilter) Contains (item [] byte ) bool { bf.mutex.RLock() defer bf.mutex.RUnlock() for _, h := range bf.hashFuncs { h.Reset() _, _ = h.Write(item) index := h.Sum64() % bf.size if !bf.bitset.Test( uint (index)) { return false } } return true }
所以其实我们可以看出, 每一次的判断都要进行K次的hash运算,所以K越大,就越耗时。
2.3 单测 func TestBloomFilterBasic (t *testing.T) { bf := NewWithFalsePositiveRate( 1000 , 0.01 ) // 1%误判率 // 测试添加和查询 testItems := [][] byte { [] byte ( "1" ), [] byte ( "2" ), [] byte ( "3" ), [] byte ( "4" ), } for _, item := range testItems { bf.Add(item) if !bf.Contains(item) { t.Errorf( "Expected item %q to be in bloom filter" , item) } } // 测试不存在的项 nonExistentItems := [][] byte { [] byte ( "5" ), [] byte ( "6" ), [] byte ( "7" ), } for _, item := range nonExistentItems { if bf.Contains(item) { t.Errorf( "Item %q should not be in bloom filter" , item) } } }
单测结果 func TestFalsePositiveRate (t *testing.T) { expectedItems := uint64 ( 1000000 ) falsePositiveRate := 0.01 // 1% bf := NewWithFalsePositiveRate(expectedItems, falsePositiveRate) // 添加10000个随机字符串 addedItems := make ([][] byte , expectedItems) for i := uint64 ( 0 ); i < expectedItems; i++ { item := randomString( 30 ) addedItems[i] = item bf.Add(item) } // 检查所有添加的项是否都在过滤器中 for _, item := range addedItems { if !bf.Contains(item) { t.Errorf( "Added item not found in bloom filter" ) } } start := time.Now() // 检查1000000个未添加的随机字符串的误判率 falsePositives := 0 totalTests := 1000000 for i := 0 ; i < totalTests; i++ { item := randomString( 30 ) if bf.Contains(item) { falsePositives++ } } consumingTime := time.Since(start) actualRate := float64 (falsePositives) / float64 (totalTests) t.Logf( "Expected false positive rate: %.2f%%" , falsePositiveRate* 100 ) t.Logf( "Actual false positive rate: %.2f%%, consuming time: %v" , actualRate* 100 , consumingTime) if actualRate > falsePositiveRate* 2 { // 允许实际误判率是预期的2倍以内 t.Errorf( "False positive rate too high: expected %.2f%%, got %.2f%%" , falsePositiveRate* 100 , actualRate* 100 ) } }
误判率和耗时结果 详细代码在:https://github.com/CocaineCong/BiliBili-Code 中