如何处理数据库并发操作

1. 基本问题

假设项目的用户信息包括用户积分,现在需求是给用户增加积分。在Service层有一个方法addCredits负责执行增加积分的操作。

    @Override
    @Transactional("fproxy")
    public void addCredits(User user, int number) {
        // user对象为已从数据库读出的用户信息
        // 先调用user.getCredits()获取当前的用户积分,增加number数量后写入user对象中
        user.setCredits(user.getCredits() + number);
        
        // 向数据库写入新的用户信息
        userDao.update(user);
    }

上面看起来很简单的例子,其实有一个很显而易见的bug。就是当同一个用户不同地方登录的时候(可以是不同浏览器、不同app),用户登录信息是保存在各自的会话中的,每次登录会话都会有一个独立的用户信息副本。假如初始用户积分为0,在浏览器F登录的用户,调用了该方法增加积分1,此时数据库中用户积分为1;随后,在浏览器C登录的用户也执行了该操作增加1积分,由于此时C关联的会话状态中,用户积分仍为0,加1后再写入数据库,数据库中用户积分还是1,并没有变成2。噢,这并不是我们所期望的结果。

我写了一个简单的单元测试来重现这个应用场景。

public class UserServiceTest extends SpringTestBase {
    private static final Logger logger = LoggerFactory.getLogger(UserServiceTest.class);

    @Autowired
    private UserService userService;

    @Autowired
    private UserDao userDao;

    // 固定大小为2的线程池
    private static Executor executor = Executors.newFixedThreadPool(2);
    
    // 单元测试方法
    public void testConcurrency() {
        
        // 每个线程代表一次登录会话
        Runnable task = new Runnable() {
            public void run() {
                // 线程id
                long tid = Thread.currentThread().getId();
                
                // 模拟用户登录
                User user = userService.getUserByEmail("test@163.com");
                logger.debug("tid {}: {} has credits {}", tid, user.getName(), user.getCredits());
                
                userService.addCredits(user, 1);
                
                logger.debug("tid {}: end.", tid);
            }
        };
        
        // 启动两个线程测试并发情况
        executor.execute(task);
        executor.execute(task);
        
        // 主线程等待子线程结束,并打印最终状态
        try {
            logger.debug("主线程: 休眠等待10s");
            Thread.sleep(10000);
            
            User user = userService.getUserByEmail("test@163.com");
            logger.debug("主线程: {} has credits {}", user.getName(), user.getCredits());
            logger.debug("主线程: 程序运行结束");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

这个单元测试的输出:

DEBUG 2016-10-24 15:11:27,422: com.funway.fproxy.service.UserServiceTest (testConcurrency,111) – 主线程: 休眠等待10s
DEBUG 2016-10-24 15:11:27,613: com.funway.fproxy.service.UserServiceTest (run,97) – tid 18: funway has credits 0
DEBUG 2016-10-24 15:11:27,613: com.funway.fproxy.service.UserServiceTest (run,97) – tid 19: funway has credits 0
DEBUG 2016-10-24 15:11:27,654: com.funway.fproxy.service.UserServiceTest (run,101) – tid 18: end.
DEBUG 2016-10-24 15:11:27,654: com.funway.fproxy.service.UserServiceTest (run,101) – tid 19: end.
DEBUG 2016-10-24 15:11:37,428: com.funway.fproxy.service.UserServiceTest (testConcurrency,115) – 主线程: funway has credits 1
DEBUG 2016-10-24 15:11:37,428: com.funway.fproxy.service.UserServiceTest (testConcurrency,116) – 主线程: 程序运行结束

PS:其实可以在log4j的配置中设置输出线程名,这样我就不用手写tid了 =。=#  在ConversionPattern属性值中加个%t就是打印当前线程名。。。

2. 方案1 – 初步思路

处理上述问题,可以有多重解决方法:

  1. 一个用户只允许同时存在一个登录会话,新登录的踢掉旧登录的。这个太简单粗暴了,并不是一个好方法。
  2. 一个用户只允许同时存在一个登录会话,多处登录共享该会话状态。这个涉及到session共享了,不在本文讨论范围。
  3. 在更新用户信息前,先从数据库重新读取最新的用户信息。

本文我们采用第3种方法,改写代码addCredits()

    public void addCredits(User user, int number) {
        long tid = Thread.currentThread().getId();
        
        user = userDao.getUserById(user.getId());
        logger.debug("tid {}: 重新获取用户信息, credits={}", tid, user.getCredits());
        
        user.setCredits(user.getCredits() + number);
        userDao.update(user);
    }

但是,只修改这里其实还是不够的,这段代码只能应付两次调用addCredits不是在“同一时刻”的情况。而在我们的单元测试中,两个线程是并发执行的。可能在线程1取到最新的用户信息,还没将新值写入数据库之前,线程2就取到了用户信息。这就跟之前的代码是一样的,我们并没有真正隔离这两次函数调用。

3. 方案2 – synchronized关键字

关于synchronized关键字的用法,百度一下有很多。其作用是为一段代码加上一个对象锁,只有获得该对象锁的线程才能运行该代码段。需要注意的是:synchronized锁住的是对象,并不是代码段!好在我们的UserService是一个在Spring容器中注册的单例,所以我们可以在addCredits方法前面直接使用synchronized关键字。(关于synchronized的介绍,我觉得这篇写的不错http://blog.csdn.net/xiao__gui/article/details/8188833

修改UserService的addCredits方法如下:

    @Override
    @Transactional(value = "fproxy")
    public synchronized void addCredits(User user, int number) {
        long tid = Thread.currentThread().getId();

        user = userDao.getUserById(user.getId());
        logger.debug("tid {}: 重新获取用户信息, credits={}", tid, user.getCredits());
        
        try {
            Thread.sleep(500);
        } catch (Exception e) {
            // TODO: handle exception
        }
        
        user.setCredits(user.getCredits() + number);
        userDao.update(user);
        logger.debug("tid {}: 更新用户信息, credits={}", tid, user.getCredits());
    }

为了更明显的检验多线程是否有同步串行执行这个addCredits方法,我特意加了一句sleep 500毫秒。检查输出:

DEBUG 2016-10-24 21:43:37,120: com.funway.fproxy.service.UserServiceTest (testConcurrency,112) – 主线程: 休眠等待10s
DEBUG 2016-10-24 21:43:37,338: com.funway.fproxy.service.UserServiceTest (run,98) – tid 19: funway has credits 34
DEBUG 2016-10-24 21:43:37,338: com.funway.fproxy.service.UserServiceTest (run,98) – tid 18: funway has credits 34
DEBUG 2016-10-24 21:43:37,358: com.funway.fproxy.service.impl.UserServiceImpl (addCredits,324) – tid 18: 重新获取用户信息, credits=34
DEBUG 2016-10-24 21:43:37,866: com.funway.fproxy.service.impl.UserServiceImpl (addCredits,335) – tid 18: 更新用户信息, credits=35
DEBUG 2016-10-24 21:43:37,868: com.funway.fproxy.service.impl.UserServiceImpl (addCredits,324) – tid 19: 重新获取用户信息, credits=34
DEBUG 2016-10-24 21:43:37,892: com.funway.fproxy.service.UserServiceTest (run,102) – tid 18: end.
DEBUG 2016-10-24 21:43:38,372: com.funway.fproxy.service.impl.UserServiceImpl (addCredits,335) – tid 19: 更新用户信息, credits=35
DEBUG 2016-10-24 21:43:38,386: com.funway.fproxy.service.UserServiceTest (run,102) – tid 19: end.
DEBUG 2016-10-24 21:43:47,125: com.funway.fproxy.service.UserServiceTest (testConcurrency,116) – 主线程: funway has credits 35
DEBUG 2016-10-24 21:43:47,125: com.funway.fproxy.service.UserServiceTest (testConcurrency,117) – 主线程: 程序运行结束

可以看到,两个线程对addCredits方法的调用确实是串行地执行了,但结果却还是不对?why???

在log4j.xml配置文件中设置“org.hibernate”的等级为debug,然后重新运行上面代码,就会发现实际上线程tid=18的update语句并没有被立即执行,直到线程tid=19重新获取用户信息之后,才真正将先前的update记录写入数据库。这根本的原因是由于hibernate的缓存机制。update操作被暂存在当前session的缓存中,而由于我们的addCredits方法使用了声明式事务(@Transactional注解),其session的创建、刷新以及事务的提交都被Spring框架代理了。

在上面的问题中,就是因为刷新操作flush并没有在addCredits方法中被显示调用,而是在addCredits方法结束后由Spring框架自动代理的。那么当线程tid=18执行完addCredits方法,还没flush到数据库之前,线程tid=19就抢占先机从数据库取了一遍旧数据,才导致了上面的问题。

既然知道了问题所在,修改addCredits方法,在线程结束前手动flush即可:

    @Override
    @Transactional(value = "fproxy")
    public synchronized void addCredits(User user, int number) {
        long tid = Thread.currentThread().getId();

        user = userDao.getUserById(user.getId());
        logger.debug("tid {}: 重新获取用户信息, credits={}", tid, user.getCredits());
        
        try {
            Thread.sleep(500);
        } catch (Exception e) {
            // TODO: handle exception
        }
        
        user.setCredits(user.getCredits() + number);
        userDao.update(user);
        
        // 手动flush
        sessionFactory.getCurrentSession().flush();
        logger.debug("tid {}: 更新用户信息, credits={}", tid, user.getCredits());
    }

检查程序输出,这回完全对了:

DEBUG 2016-10-25 01:29:50,878: com.funway.fproxy.service.UserServiceTest (testConcurrency,118) – 主线程: 休眠等待10s
DEBUG 2016-10-25 01:29:51,067: com.funway.fproxy.service.UserServiceTest (run,104) – tid 18: funway has credits 44
DEBUG 2016-10-25 01:29:51,067: com.funway.fproxy.service.UserServiceTest (run,104) – tid 19: funway has credits 44
DEBUG 2016-10-25 01:29:51,083: com.funway.fproxy.service.impl.UserServiceImpl (addCredits,324) – tid 19: 重新获取用户信息, credits=44
DEBUG 2016-10-25 01:29:51,606: com.funway.fproxy.service.impl.UserServiceImpl (addCredits,335) – tid 19: 更新用户信息, credits=45
DEBUG 2016-10-25 01:29:51,609: com.funway.fproxy.service.impl.UserServiceImpl (addCredits,324) – tid 18: 重新获取用户信息, credits=45
DEBUG 2016-10-25 01:29:51,618: com.funway.fproxy.service.UserServiceTest (run,108) – tid 19: end.
DEBUG 2016-10-25 01:29:52,116: com.funway.fproxy.service.impl.UserServiceImpl (addCredits,335) – tid 18: 更新用户信息, credits=46
DEBUG 2016-10-25 01:29:52,127: com.funway.fproxy.service.UserServiceTest (run,108) – tid 18: end.
DEBUG 2016-10-25 01:30:00,888: com.funway.fproxy.service.UserServiceTest (testConcurrency,122) – 主线程: funway has credits 46
DEBUG 2016-10-25 01:30:00,888: com.funway.fproxy.service.UserServiceTest (testConcurrency,123) – 主线程: 程序运行结束

4. 采用事务控制

4.1 方案3 – Isolation.SERIALIZABLE

除了使用代码上手动并发控制(synchronized)之外,我在想是否可以依靠数据库事务的属性来进行并发控制?因为印象中,经常听到关于数据库事务的隔离级别:READ_UNCOMMITTED、READ_COMMITTED、REPEATABLE_READ、SERIALIZABLE。我就认为是不是只要给我们的addCredits方法设置为SERIALIZABLE事务即可?(默认的事务隔离级别等于实际数据库的事务隔离级别)修改Service层的addCredits方法,删除synchronized关键字,添加SERIALIZABLE隔离属性:

    @Override
    @Transactional(value = "fproxy", isolation=Isolation.SERIALIZABLE)
    public void addCredits(User user, int number) {
        long tid = Thread.currentThread().getId();

//        user = userDao.getUserById(user.getId());
        user = (User) sessionFactory.getCurrentSession().get(User.class, user.getId());
        logger.debug("tid {}: 重新获取用户信息, credits={}", tid, user.getCredits());
        
        try {
            Thread.sleep(500);
        } catch (Exception e) {
            // TODO: handle exception
        }
        
        user.setCredits(user.getCredits() + number);
//        userDao.update(user);
        sessionFactory.getCurrentSession().update(user);
        
        // 手动flush
        sessionFactory.getCurrentSession().flush();
        logger.debug("tid {}: 更新用户信息, credits={}", tid, user.getCredits());
    }

PS:上面的代码中,我注释了两行代码,userDao.getUserById()与userDao.update(),而直接采用getCurrentSession()获取当前会话并执行get与update操作。具体原因在后面解释。

执行这段代码,发现结果居然报错:

DEBUG main 2016-10-25 21:37:09,881: com.funway.fproxy.service.UserServiceTest (testConcurrency,118) – 主线程: 休眠等待10s
DEBUG pool-1-thread-1 2016-10-25 21:37:10,114: com.funway.fproxy.service.UserServiceTest (run,104) – tid 18: funway has credits 11
DEBUG pool-1-thread-2 2016-10-25 21:37:10,115: com.funway.fproxy.service.UserServiceTest (run,104) – tid 19: funway has credits 11
DEBUG pool-1-thread-1 2016-10-25 21:37:10,133: com.funway.fproxy.service.impl.UserServiceImpl (addCredits,327) – tid 18: 重新获取用户信息, credits=11
DEBUG pool-1-thread-2 2016-10-25 21:37:10,133: com.funway.fproxy.service.impl.UserServiceImpl (addCredits,327) – tid 19: 重新获取用户信息, credits=11
DEBUG pool-1-thread-1 2016-10-25 21:37:10,658: com.funway.fproxy.service.impl.UserServiceImpl (addCredits,341) – tid 18: 更新用户信息, credits=12
WARN  pool-1-thread-2 2016-10-25 21:37:10,660: org.hibernate.engine.jdbc.spi.SqlExceptionHelper (logExceptions,144) – SQL Error: 1213, SQLState: 40001
ERROR pool-1-thread-2 2016-10-25 21:37:10,660: org.hibernate.engine.jdbc.spi.SqlExceptionHelper (logExceptions,146) – Deadlock found when trying to get lock; try restarting transaction
Exception in thread “pool-1-thread-2” org.hibernate.exception.LockAcquisitionException: could not execute statement
… …
DEBUG pool-1-thread-1 2016-10-25 21:37:10,669: com.funway.fproxy.service.UserServiceTest (run,108) – tid 18: end.
DEBUG main 2016-10-25 21:37:19,891: com.funway.fproxy.service.UserServiceTest (testConcurrency,122) – 主线程: funway has credits 12
DEBUG main 2016-10-25 21:37:19,891: com.funway.fproxy.service.UserServiceTest (testConcurrency,123) – 主线程: 程序运行结束

线程18的事务正确执行了,而线程19的事务在更新用户信息的时候抛出异常,原因是发现死锁。

我一开始天真的以为Isolation.SERIALIZABLE会让这两个并发事务串行执行,但经过多番搜索答案,我才发现Isolation.SERIALIZABLE只是“可串行化”而不是“串行执行”。好多垃圾博客都将它直接翻译为串行化了,把最重要的able去掉了=。=# 简直智障。SERIALIZABLE可以理解为“如果两个并发事务都正在结束,那么最后的结果看起来就像两个事务串行执行一样。不过,它并不保证两个事务执行的先后顺序,也不保证事务都能正常结束”。SERIALIZABLE通过给每个读写操作加锁来保证事务的并发控制,当系统探测到几个并发事务有“写冲突”的时候,只有一个事务可以正常提交,其他事务均报错。

对此我特意用mysql命令行做了一个实验:

%e5%b1%8f%e5%b9%95%e5%bf%ab%e7%85%a7-2016-10-25-22-27-28

4.2 方案4 – 悲观锁

显然,依靠SERIALIZABLE隔离级别无法实现我们的目标。要真正搞懂事务的隔离性,必须得结合数据库的加锁机制来看,更多的关于数据库加锁机制,不在本文的讨论范围(当然我现在也没搞懂,没好好学数据库=。=)。只是在搜索答案的时候发现有人提到乐观锁悲观锁的概念,便尝试在上面的代码中给读取用户信息加悲观锁。

    @Override
    @Transactional(value = "fproxy", isolation=Isolation.SERIALIZABLE)
    public void addCredits(User user, int number) {
        long tid = Thread.currentThread().getId();

        user = (User) sessionFactory.getCurrentSession().get(User.class, user.getId(), LockOptions.UPGRADE);
        logger.debug("tid {}: 重新获取用户信息, credits={}", tid, user.getCredits());
        
        try {
            Thread.sleep(500);
        } catch (Exception e) {
            // TODO: handle exception
        }
        
        user.setCredits(user.getCredits() + number);
        sessionFactory.getCurrentSession().update(user);
        
        // 手动flush
        // 用数据库加锁的方式,其实就不需要在这里手动flush了,
        // 因为锁要等待commit之后才会释放,spring的事务代理会在commit之前自动flush。
        sessionFactory.getCurrentSession().flush();
        logger.debug("tid {}: 更新用户信息, credits={}", tid, user.getCredits());
    }

上面代码中,调用session的get()方法时的第三个参数LockOptions.UPGRADE就表示使用悲观锁读取数据。翻译成sql语句就是”select * from fp_user where id = :id for update“。上面这段代码的执行步骤如下表所示:

%e5%b1%8f%e5%b9%95%e5%bf%ab%e7%85%a7-2016-10-26-02-14-54

PS: 既然手工强制读数据时使用悲观锁,那么Isolation.SERIALIZABLE根本就不需要了,不管什么隔离级别,上述代码都能正常运行。

4.3 方案5 – 乐观锁

。。。暂时不研究了,乐观锁与悲观锁的区别自己百度。

4.4 getCurrentSession与openSession的区别

从4.1开始我将userDao.getUserById()注释,替换成了getCurrentSession().get(),原因的话,先看一下我的getUserById()源码:

    public User getUserById(long id) {
        Session session = sessionFactory.openSession();
        try {
            User user = (User) session.get(User.class, id);
            return user;
        } finally {
            session.close();
        }
    }

在这段代码中,获取session的方法是openSession()而不是getCurrentSession()。getCurrentSession表示获取当前线程已打开的session,不存在则打开新session;而openSession则表示重新打开一个session。而不同的session,他们的事务必然不同,因为事务是依赖于session的(显而易见,你想啊在sql命令行时,你要先登录sql打开一个会话,然后才能start transaction并commit),而且这个新的session甚至没有启动事务(相当于每一句sql都是一个独立的事务)。这样,即使在getUserById()方法中使用悲观锁来get,也不会得到想要的结果。因为线程1一get完,session就关闭,锁就释放了,那么线程2就可以get了,这里get操作与addCredits()并不在同一个事务中。

 

5. 总结

综上所述,能够正确处理1中所描述的并发问题的,只有方案2(使用synchronized关键字,在程序中加锁)与方案4(在读数据库时使用悲观锁)

出于让代码更加可控的目的,小项目中可以采用synchronized关键字的方案。小项目是指单一Web服务器,单个java进程的情况。synchronized足够能很好的控制多线程并发。

但如果在大项目中,有多台Web服务器同时服务,那么方案2在代码中加锁就还是会出现原来的问题,因为服务器1中的代码无法隔离服务器2中的代码,这时候就需重新审视方案4中的数据库锁,使用数据库加锁的方式进行隔离。

ps:@2016-11-09,今天听猫哥指导了一个数据库加锁的方法,可以把锁加在一张专门的表上,比如说叫lck表(lock是保留字当然不能用)。然后在需要加锁的时候,select * from lck where id=0 for update,这样就能避免锁住业务数据表,防止其他只读操作无法读取当前数据。

6. php怎么处理数据库并发操作

6.1 java与php在处理并发请求的区别

上面的例子都是建立在java编程环境中的,而且是多线程并发。那对于php的web应用,又该如何处理这种数据库并发操作呢?

首先,我觉得需要搞清楚java与php在处理并发请求时候的区别:java是单进程、多线程,php是多进程。以spring mvc框架的java应用程序为例,用户请求到达nginx,nginx转发给tomcat(单进程),tomcat作为一个servlet容器在收到用户请求后,将请求转发给对应的context容器处理,spring mvc就是一个context容器,然后spring将启动一个线程处理该请求(多线程)。对于php应用程序,用户请求到达nginx,然后nginx将请求转发给一个空闲的php-fpm进程(多进程)来执行php程序。

 

所以,php没有synchronized这种多线程并发控制的方式来处理数据库并发操作。只能考虑如下两种方式:

  1. 加锁(悲观锁或者乐观锁)
  2. 使用任务队列,将这些数据库操作写入队列,然后单独启一个作业串行执行

6.2 php使用悲观锁进行数据库并发处理

参考:

https://my.oschina.net/starlight36/blog/344986

http://www.04007.cn/article/111.html

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top