博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
分布式重建缓存的并发冲突 详解
阅读量:2393 次
发布时间:2019-05-10

本文共 4923 字,大约阅读时间需要 16 分钟。

在分布式系统中,如果缓存服务在本地的 Ehcache 中都读取不到数据,此时需要重新到源头的服务中去拉去数据,拉取到数据之后,赶紧先给 Nginx 的请求返回,同时将数据写入 Ehcache 和 Redis中。此时会出现分布式重建缓存的并发冲突问题
重建缓存 : 比如数据在所有的缓存中都不存在 (如使用 LRU算法 给清理掉),就需要重新查询数据写入缓存,重建缓存
分布式的重建缓存 : 在不同的机器上,不同的服务实例中,去做上面的事情,就会出现多个机器分布式重建去读取相同的数据,然后写入缓存中
1> 流量均匀分布到所有缓存服务实例上 : 应用层Nginx 将请求流量均匀地打到各个缓存服务实例中的,相同的服务可能部署到不同的机器上
2> 应用层Nginx 通过 商品id 的 hash计算,走固定的缓存服务实例;分发层的 Nginx 的 Lua脚本,通过对 商品id做一个hash,然后对应用nginx数量取模,最终找到固定在 Nginx的地址列表 中的地址。将每个商品的请求固定分发到同一个 应用层Nginx 上面去。在 应用层Nginx 里,发现自己本地 lua shared dict 缓存中没有数据的时候,就采取一样的方式,对 product id 取模,然后将请求固定分发到同一个缓存服务实例中去,这样的话,就不会出现说多个缓存服务实例分布式的去更新那个缓存
3> 源信息服务发送的变更消息,需要按照 商品id 去分区,固定的商品变更走固定的 Kafka分区,也就是固定的一个缓存服务实例获取到。缓存服务,是监听 Kafka topic 的,一个缓存服务实例,作为一个 kafka consumer,就消费 topic 中的一个 partition,所以有多个缓存服务实例的话,每个缓存服务实例就消费一个 kafka partition。所以这里,一般来说,源头信息服务,在发送消息到 kafka topic 的时候,都需要按照 product id 去分区,也就时说,同一个 product id变更的消息一定是到同一个 kafka partition 中去的,也就是说同一个 product id的变更消息,一定是同一个缓存服务实例消费到的。实现方式很简单就是通过 kafka producer api,里面 send message 的时候,多加一个参数就即可,product id 传递进去
4> 问题 : 自己写的简易的 hash分发,与 kafka的分区,可能并不一致。自己写的简易的 hash分发策略,是按照 crc32 去取 hash值,然后再取模的。但是,不知道 kafka producer 的 hash策略是什么,很可能说跟我们的策略是不一样的,这就可能导致说,数据变更的消息所到的缓存服务实例,跟应用层 Nginx分发到的那个缓存服务实例也许就不在一台机器上了,这样的话,在高并发,极端的情况下,可能就会出现冲突
5> 分布式的缓存重建并发冲突问题发生 : 比如说同意服务部署在多台机器上,都需要修改缓存数据,此时就会出现,缓存被覆盖即使用并非最新的原始数据区覆盖缓存中已经是最新数据,此时将会导致缓存数据不是最新数据,从而发生数据不一致的情况
基于 zookeeper 分布式锁的解决 
分布式重建缓存的并发冲突
分布式锁 : 如果有多个机器在访问同一个共享资源,那么这个时候,如果需要加个锁,让多个分布式的机器在访问共享资源的时候串行起来,这个时候,多个不同机器上的服务共享这个锁,这就是分布式锁
分布式锁当然有很多种不同的实现方案,redis分布式锁,zookeeper分布式锁。采用 zookeeper 做分布式协调这一块,还是很流行的,大数据应用里面,hadoop,storm,都是基于zk去做分布式协调
zk分布式锁的解决并发冲突的方案 :
1> 变更缓存重建以及空缓存请求重建,更新 Redis 之前,都需要先获取对应 商品id 的分布式锁
2> 拿到分布式锁之后,需要根据时间版本去比较一下,如果自己的版本新于 Redis 中的版本,那么就更新,否则就不更新
3> 如果拿不到分布式锁,那么就等待,不断轮询等待,直到获取到分布式的锁
zk分布式锁的原理 : 通过去创建 zk 的一个临时node,来模拟给一个 商品id 加锁。zk会保证只会创建一个临时node,其他请求过来如果再要创建临时node,就会报错抛出 NodeExistsException。所以说,所谓上锁,其实就是去创建某个 product id 对应的一个 临时node,如果 临时node 创建成功了,那么说明成功加锁,此时就可以去执行对 Redis 里面数据的操作。如果临时node 创建失败,说明锁已经被某个服务拿到,在操作 Reids 中的数据,那么就不断的等待,直到可以获取到锁为止
zk分布式锁的代码封装 : zookeeper java client api去封装连接zk,以及获取分布式锁,还有释放分布式锁的代码。释放一个分布式锁,去删除掉那个 临时node 即可,就代表释放一个锁,那么此时其他的机器就可以创建临时node,获取到锁。使用 zk 去实现一个分布式锁,也有很多种做法,有复杂的,也有简单的,此处使用的是一种最简单有效的分布式锁,能满足大多数情况的使用
引入 zookeeper 库
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.12</version>
</dependency>
创建 ZookeeperSession,来在 zookeeper 上创建、删除临时节点
public class ZookeeperSession {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    private ZooKeeper zooKeeper;
    private ZookeeperSession() {
        try {
            this.zooKeeper = new ZooKeeper("ci-server:2181", 5000, new ZookeeperWatcher());
            // 给一个状态 CONNECTING,连接中
            logger.debug("Zookeeper session State => ", zooKeeper.getState());
            countDownLatch.await();
            logger.debug("ZooKeeper session established......");
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
    /**
     * 获取分布式锁
     *
     * @param productId
     */
    public void acquireDistributedLock(Long productId) {
        String path = "/product-lock-" + productId;
        try {
            zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            logger.debug("success to acquire lock for product[id={}]", productId);
        } catch (KeeperException | InterruptedException e) {
            logger.debug("fail[1] to acquire lock for product[id={}]", productId);
            // 如果对应那个商品的锁的 node 已经存在,就是已经被别人加锁,那么就会报错,抛出 NodeExistsException
            int count = 0;
            while (true) {
                try {
                    Thread.sleep(20);
                    zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                } catch (InterruptedException | KeeperException e1) {
                    e1.printStackTrace();
                    count++;
                    continue;
                }
                logger.debug("success to acquire lock for product[id={}] after {} times try......", productId, count);
                break;
            }
        }
    }
    /**
     * 释放掉一个分布式锁
     *
     * @param productId
     */
    public void releaseDistributedLock(Long productId) {
        try {
            // 删除掉所有匹配节点版本的 node
            zooKeeper.delete("/product-lock-" + productId, -1);
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }
    /**
     * 建立 zookeeper session 的 Watcher
     */
    private class ZookeeperWatcher implements Watcher {
        @Override
        public void process(WatchedEvent event) {
            logger.debug("Receive watched event: {}", event.getState());
            if (Event.KeeperState.SyncConnected == event.getState()) {
                countDownLatch.countDown();
            }
        }
    }
    private static class Singleton {
        private static ZookeeperSession instance;
        static {
            instance = new ZookeeperSession();
        }
        public static ZookeeperSession getInstance() {
            return instance;
        }
    }
    /**
     * 获取 ZookeeperSession 实例
     *
     * @return
     */
    public static ZookeeperSession getInstance() {
        return Singleton.getInstance();
    }
    /**
     * 初始化单例的便捷方式
     */
    public static void init() {
        getInstance();
    }
}
业务代码
1> 主动更新 : 监听 kafka消息队列,获取到一个商品变更的消息之后,去那个源服务中调用接口拉取数据,更新到 Ehcache和 Redis中。先获取分布式锁,然后才能更新 Redis,同时更新时要比较时间版本
2> 被动重建 : 直接读取源头数据,直接返回给 Nginx,同时推送一条消息到一个队列,后台线程异步消费,后台现成负责先获取分布式锁,然后才能更新 Redis,同时要比较时间版本

转载地址:http://dogab.baihongyu.com/

你可能感兴趣的文章
Tensorflow Python API 翻译(math_ops)(第一部分)
查看>>
Tensorflow Python API 翻译(math_ops)(第二部分)
查看>>
Tensorflow Python API 翻译(constant_op)
查看>>
利用 TensorFlow 入门 Word2Vec
查看>>
多任务学习与深度学习
查看>>
利用 TensorFlow 一步一步构建一个多任务学习模型
查看>>
使用数据驱动进行配对交易:简单交易策略
查看>>
量化交易:相关系数
查看>>
课程---程序员炒股,如何计算股票投资组合的风险和收益
查看>>
人工智能资料库:第1辑(20170105)
查看>>
人工智能资料库:第2辑(20170106)
查看>>
人工智能资料库:第3辑(20170107)
查看>>
人工智能资料库:第4辑(20170108)
查看>>
人工智能资料库:第5辑(20170109)
查看>>
人工智能资料库:第20辑(20170129)
查看>>
人工智能资料库:第21辑(20170130)
查看>>
人工智能资料库:第22辑(20170131)
查看>>
人工智能资料库:第23辑(20170201)
查看>>
MongoDB-初体验
查看>>
TensorFlow中四种-Cross-Entropy-算法实现和应用
查看>>