如何避免海量数据需要查重,缓存穿透的情况的发生,使用布隆过滤器。

布隆过滤器

Bloom Filter 概念

布隆过滤器(英语:Bloom Filter)是1970年由一个叫布隆的小伙子提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。

Bloom Filter 原理

布隆过滤器的原理是,当一个元素被加入集合时,通过K个散列函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。这就是布隆过滤器的基本思想。

Bloom Filter跟单哈希函数Bit-Map不同之处在于:Bloom Filter使用了k个哈希函数,每个字符串跟k个bit对应。从而降低了冲突的概率。

Bloom Filter的缺点

bloom filter之所以能做到在时间和空间上的效率比较高,是因为牺牲了判断的准确率、删除的便利性

  1. 存在误判,可能要查到的元素并没有在容器中,但是hash之后得到的k个位置上值都是1。如果bloom filter中存储的是黑名单,那么可以通过建立一个白名单来存储可能会误判的元素。
  2. 删除困难。一个放入容器的元素映射到bit数组的k个位置上是1,删除的时候不能简单的直接置为0,可能会影响其他元素的判断。可以采用Counting Bloom Filter

缓存穿透

布隆过滤器一般我们都会用来防止缓存击穿,先来看看什么是缓存穿透

每次查询都会直接打到DB。

那么我们先把我们数据库的数据都加载到我们的过滤器中,比如数据库的id现在有:1、2、3

那就用id:1 为例子他在上图中经过三次hash之后,把三次原本值0的地方改为1

下次数据进来查询的时候如果id的值是1,那么我就把1拿去三次hash 发现三次hash的值,跟上面的三个位置完全一样,那就能证明过滤器中有1的

反之如果不一样就说明不存在了

Bloom Filter 实现

布隆过滤器有许多实现与优化,Guava中就提供了一种Bloom Filter的实现。

  • 在使用bloom filter时,绕不过的两点是预估数据量n以及期望的误判率fpp,
  • 在实现bloom filter时,绕不过的两点就是hash函数的选取以及bit数组的大小。

对于一个确定的场景,我们预估要存的数据量为n,期望的误判率为fpp,然后需要计算我们需要的Bit数组的大小m,以及hash函数的个数k,并选择hash函数

(1)Bit数组大小选择

  根据预估数据量n以及误判率fpp,bit数组大小的m的计算方式:

(2)哈希函数选择

​ 由预估数据量n以及bit数组长度m,可以得到一个hash函数的个数k:

​ 哈希函数的选择对性能的影响应该是很大的,一个好的哈希函数要能近似等概率的将字符串映射到各个Bit。选择k个不同的哈希函数比较麻烦,一种简单的方法是选择一个哈希函数,然后送入k个不同的参数。

我们使用guava包实现BloomFilter,需要引入guava包:

 <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>23.0</version>
 </dependency>

测试分两步:

1、往过滤器中放一百万个数,然后去验证这一百万个数是否能通过过滤器

2、另外找一万个数,去检验漏网之鱼的数量

/**
 * 测试布隆过滤器(可用于redis缓存穿透)
 *
 * @author 敖丙
 */
public class TestBloomFilter {

    private static int total = 1000000;
    private static BloomFilter<Integer> bf = BloomFilter.create(Funnels.integerFunnel(), total);
//    private static BloomFilter<Integer> bf = BloomFilter.create(Funnels.integerFunnel(), total, 0.001);

    public static void main(String[] args) {
        // 初始化1000000条数据到过滤器中
        for (int i = 0; i < total; i++) {
            bf.put(i);
        }

        // 匹配已在过滤器中的值,是否有匹配不上的
        for (int i = 0; i < total; i++) {
            if (!bf.mightContain(i)) {
                System.out.println("有坏人逃脱了~~~");
            }
        }

        // 匹配不在过滤器中的10000个值,有多少匹配出来
        int count = 0;
        for (int i = total; i < total + 10000; i++) {
            if (bf.mightContain(i)) {
                count++;
            }
        }
        System.out.println("误伤的数量:" + count);
    }

}

运行结果:

运行结果表示,遍历这一百万个在过滤器中的数时,都被识别出来了。一万个不在过滤器中的数,误伤了320个,错误率是0.03左右。

看下BloomFilter的源码:

public static <T> BloomFilter<T> create(Funnel<? super T> funnel, int expectedInsertions) {
        return create(funnel, (long) expectedInsertions);
    }

    public static <T> BloomFilter<T> create(Funnel<? super T> funnel, long expectedInsertions) {
        return create(funnel, expectedInsertions, 0.03); // FYI, for 3%, we always get 5 hash functions
    }

    public static <T> BloomFilter<T> create(
          Funnel<? super T> funnel, long expectedInsertions, double fpp) {
        return create(funnel, expectedInsertions, fpp, BloomFilterStrategies.MURMUR128_MITZ_64);
    }

    static <T> BloomFilter<T> create(
      Funnel<? super T> funnel, long expectedInsertions, double fpp, Strategy strategy) {
     ......
    }

BloomFilter一共四个create方法,不过最终都是走向第四个。看一下每个参数的含义:

funnel:数据类型(一般是调用Funnels工具类中的)
expectedInsertions:期望插入的值的个数
fpp 错误率(默认值为0.03)
strategy 哈希算法(我也不懂啥意思)Bloom Filter的应用

在最后一个create方法中,设置一个断点:

上面的numBits,表示存一百万个int类型数字,需要的位数为7298440,700多万位。理论上存一百万个数,一个int是4字节32位,需要481000000=3200万位。如果使用HashMap去存,按HashMap50%的存储效率,需要6400万位。可以看出BloomFilter的存储空间很小,只有HashMap的1/10左右

上面的numHashFunctions,表示需要5个函数去存这些数字

使用第三个create方法,我们设置下错误率:

private static BloomFilter<Integer> bf = BloomFilter.create(Funnels.integerFunnel(), total, 0.0003);

再运行看看:

此时误伤的数量为4,错误率为0.04%左右。

当错误率设为0.0003时,所需要的位数为16883499,1600万位,需要12个函数

和上面对比可以看出,错误率越大,所需空间和时间越小,错误率越小,所需空间和时间约大

常见的几个应用场景:

  • cerberus在收集监控数据的时候, 有的系统的监控项量会很大, 需要检查一个监控项的名字是否已经被记录到db过了, 如果没有的话就需要写入db.
  • 爬虫过滤已抓到的url就不再抓,可用bloom filter过滤
  • 垃圾邮件过滤。如果用哈希表,每存储一亿个 email地址,就需要 1.6GB的内存(用哈希表实现的具体办法是将每一个 email地址对应成一个八字节的信息指纹,然后将这些信息指纹存入哈希表,由于哈希表的存储效率一般只有 50%,因此一个 email地址需要占用十六个字节。一亿个地址大约要 1.6GB,即十六亿字节的内存)。因此存贮几十亿个邮件地址可能需要上百 GB的内存。而Bloom Filter只需要哈希表 1/8到 14 的大小就能解决同样的问题。

实战

下面是一个基于file,memory,redis实现的布隆过滤器

package simplebloom

import (
    "compress/gzip"
    "crypto/sha256"
    "fmt"
    "log"
    "os"

    "encoding/gob"

    "github.com/gomodule/redigo/redis"
    "github.com/pkg/errors"
    "github.com/spaolacci/murmur3"
)

type BloomFilter interface {
    Put([]byte)
    PutString(string)

    Has([]byte) bool
    HasString(string) bool

    Close()
}

type FileBloomFilter struct {
    *MemoryBloomFilter
    target string
}

type MemoryBloomFilter struct {
    k  uint
    bs BitSets
}

type RedisBloomFilter struct {
    cli redis.Conn
    n   uint
    k   uint
}

func HashData(data []byte, seed uint) uint {
    sha_data := sha256.Sum256(data)
    data = sha_data[:]
    m := murmur3.New64WithSeed(uint32(seed))
    m.Write(data)
    return uint(m.Sum64())
}

// NewMemoryBloomFilter 创建一个内存的bloom filter
func NewMemoryBloomFilter(n uint, k uint) *MemoryBloomFilter {
    return &MemoryBloomFilter{
        k:  k,
        bs: NewBitSets(n),
    }
}

// Put 添加一条记录
func (filter *MemoryBloomFilter) Put(data []byte) {
    l := uint(len(filter.bs))
    for i := uint(0); i < filter.k; i++ {
        filter.bs.Set(HashData(data, i) % l)
    }
}

// Put 添加一条string记录
func (filter *MemoryBloomFilter) PutString(data string) {
    filter.Put([]byte(data))
}

// Has 推测记录是否已存在
func (filter *MemoryBloomFilter) Has(data []byte) bool {
    l := uint(len(filter.bs))

    for i := uint(0); i < filter.k; i++ {
        if !filter.bs.IsSet(HashData(data, i) % l) {
            return false
        }
    }

    return true
}

// Has 推测记录是否已存在
func (filter *MemoryBloomFilter) HasString(data string) bool {
    return filter.Has([]byte(data))
}

// Close 关闭bloom filter
func (filter *MemoryBloomFilter) Close() {
    filter.bs = nil
}

// NewFileBloomFilter 创建一个以文件为存储介质的bloom filter
// target 文件保存处
// 本质上就是增加了MemoryBloomFilter, 在创建时打开文件, 在Close时保存文件
func NewFileBloomFilter(target string, n uint, k uint) *FileBloomFilter {
    memory_filter := NewMemoryBloomFilter(n, k)
    filter := &FileBloomFilter{
        memory_filter, target,
    }
    filter.reStore()

    return filter
}

func (filter *FileBloomFilter) Close() {
    filter.store()
    filter.bs = nil
}

func (filter *FileBloomFilter) store() {
    f, err := os.Create(filter.target)
    if err != nil {
        log.Fatalf("%+v", errors.Wrap(err, "Open file"))
    }
    defer f.Close()

    gzip_writer := gzip.NewWriter(f)
    defer gzip_writer.Close()

    encoder := gob.NewEncoder(gzip_writer)
    err = encoder.Encode(filter.bs)
    if err != nil {
        log.Fatalf("%+v", errors.Wrap(err, "gzip"))
    }
}

func (filter *FileBloomFilter) reStore() {
    f, err := os.Open(filter.target)
    if err != nil {
        if os.IsNotExist(err) {
            return
        }
        log.Fatalf("%+v", errors.Wrap(err, "Open file"))
    }
    defer f.Close()

    gzip_reader, err := gzip.NewReader(f)
    if err != nil {
        log.Fatalf("%+v", errors.Wrap(err, "Ungzip"))
    }

    decoder := gob.NewDecoder(gzip_reader)
    err = decoder.Decode(&filter.bs)
    if err != nil {
        log.Fatalf("%+v", errors.Wrap(err, "gob decode"))
    }
}

func NewRedisBloomFilter(cli redis.Conn, n, k uint) *RedisBloomFilter {
    filter := &RedisBloomFilter{
        cli: cli,
        n:   n,
        k:   k,
    }
    length, _ := redis.Int64(cli.Do("LLEN", filter.redisKey()))
    if uint(length) != n {
        bs := make([]interface{}, n)
        push_args := []interface{}{filter.redisKey()}
        push_args = append(push_args, bs...)
        cli.Do("DEL", filter.redisKey())
        cli.Do("LPUSH", push_args...)
    }

    return filter
}

func (filter *RedisBloomFilter) Put(data []byte) {
    for i := uint(0); i < filter.k; i++ {
        _, err := filter.cli.Do("LSET", filter.redisKey(), HashData(data, i)%filter.n, "1")
        if err != nil {
            log.Fatalf("%+v", errors.Wrap(err, "LSET"))
        }
    }
}

func (filter *RedisBloomFilter) PutString(data string) {
    filter.Put([]byte(data))
}

func (filter *RedisBloomFilter) Has(data []byte) bool {
    for i := uint(0); i < filter.k; i++ {
        index := HashData(data, i) % filter.n
        value, err := redis.String(filter.cli.Do("LINDEX", filter.redisKey(), index))
        if err != nil {
            log.Fatalf("%+v", errors.Wrap(err, "LINDEX"))
        }
        if value != "1" {
            return false
        }
    }

    return true
}

func (filter *RedisBloomFilter) HasString(data string) bool {
    return filter.Has([]byte(data))
}

// Close 只将cli设置为nil, 关闭redis连接的操作放在调用处
func (filter *RedisBloomFilter) Close() {
    filter.cli = nil
}

// redisKey 根据filter的n和k来生成一个独立的redis key
func (filter *RedisBloomFilter) redisKey() string {
    return fmt.Sprintf("_bloomfilter:n%d:k%d", filter.n, filter.k)
}