1. 基本问题
假设项目的用户信息包括用户积分,现在需求是给用户增加积分。在Service层有一个方法addCredits负责执行增加积分的操作。
@Override @Transactional("fproxy") public void addCredits(User user, int number) { // user对象为已从数据库读出的用户信息 // 先调用user.getCredits()获取当前的用户积分,增加number数量后写入user对象中 user.setCredits(user.getCredits() + number); // 向数据库写入新的用户信息 userDao.update(user); }
上面看起来很简单的例子,其实有一个很显而易见的bug。就是当同一个用户不同地方登录的时候(可以是不同浏览器、不同app),用户登录信息是保存在各自的会话中的,每次登录会话都会有一个独立的用户信息副本。假如初始用户积分为0,在浏览器F登录的用户,调用了该方法增加积分1,此时数据库中用户积分为1;随后,在浏览器C登录的用户也执行了该操作增加1积分,由于此时C关联的会话状态中,用户积分仍为0,加1后再写入数据库,数据库中用户积分还是1,并没有变成2。噢,这并不是我们所期望的结果。
我写了一个简单的单元测试来重现这个应用场景。
public class UserServiceTest extends SpringTestBase { private static final Logger logger = LoggerFactory.getLogger(UserServiceTest.class); @Autowired private UserService userService; @Autowired private UserDao userDao; // 固定大小为2的线程池 private static Executor executor = Executors.newFixedThreadPool(2); // 单元测试方法 public void testConcurrency() { // 每个线程代表一次登录会话 Runnable task = new Runnable() { public void run() { // 线程id long tid = Thread.currentThread().getId(); // 模拟用户登录 User user = userService.getUserByEmail("test@163.com"); logger.debug("tid {}: {} has credits {}", tid, user.getName(), user.getCredits()); userService.addCredits(user, 1); logger.debug("tid {}: end.", tid); } }; // 启动两个线程测试并发情况 executor.execute(task); executor.execute(task); // 主线程等待子线程结束,并打印最终状态 try { logger.debug("主线程: 休眠等待10s"); Thread.sleep(10000); User user = userService.getUserByEmail("test@163.com"); logger.debug("主线程: {} has credits {}", user.getName(), user.getCredits()); logger.debug("主线程: 程序运行结束"); } catch (InterruptedException e) { e.printStackTrace(); } } }
这个单元测试的输出:
DEBUG 2016-10-24 15:11:27,422: com.funway.fproxy.service.UserServiceTest (testConcurrency,111) – 主线程: 休眠等待10s
DEBUG 2016-10-24 15:11:27,613: com.funway.fproxy.service.UserServiceTest (run,97) – tid 18: funway has credits 0
DEBUG 2016-10-24 15:11:27,613: com.funway.fproxy.service.UserServiceTest (run,97) – tid 19: funway has credits 0
DEBUG 2016-10-24 15:11:27,654: com.funway.fproxy.service.UserServiceTest (run,101) – tid 18: end.
DEBUG 2016-10-24 15:11:27,654: com.funway.fproxy.service.UserServiceTest (run,101) – tid 19: end.
DEBUG 2016-10-24 15:11:37,428: com.funway.fproxy.service.UserServiceTest (testConcurrency,115) – 主线程: funway has credits 1
DEBUG 2016-10-24 15:11:37,428: com.funway.fproxy.service.UserServiceTest (testConcurrency,116) – 主线程: 程序运行结束
PS:其实可以在log4j的配置中设置输出线程名,这样我就不用手写tid了 =。=# 在ConversionPattern属性值中加个%t就是打印当前线程名。。。
2. 方案1 – 初步思路
处理上述问题,可以有多重解决方法:
- 一个用户只允许同时存在一个登录会话,新登录的踢掉旧登录的。这个太简单粗暴了,并不是一个好方法。
- 一个用户只允许同时存在一个登录会话,多处登录共享该会话状态。这个涉及到session共享了,不在本文讨论范围。
- 在更新用户信息前,先从数据库重新读取最新的用户信息。
本文我们采用第3种方法,改写代码addCredits()
public void addCredits(User user, int number) { long tid = Thread.currentThread().getId(); user = userDao.getUserById(user.getId()); logger.debug("tid {}: 重新获取用户信息, credits={}", tid, user.getCredits()); user.setCredits(user.getCredits() + number); userDao.update(user); }
但是,只修改这里其实还是不够的,这段代码只能应付两次调用addCredits不是在“同一时刻”的情况。而在我们的单元测试中,两个线程是并发执行的。可能在线程1取到最新的用户信息,还没将新值写入数据库之前,线程2就取到了用户信息。这就跟之前的代码是一样的,我们并没有真正隔离这两次函数调用。
3. 方案2 – synchronized关键字
关于synchronized关键字的用法,百度一下有很多。其作用是为一段代码加上一个对象锁,只有获得该对象锁的线程才能运行该代码段。需要注意的是:synchronized锁住的是对象,并不是代码段!好在我们的UserService是一个在Spring容器中注册的单例,所以我们可以在addCredits方法前面直接使用synchronized关键字。(关于synchronized的介绍,我觉得这篇写的不错http://blog.csdn.net/xiao__gui/article/details/8188833)
修改UserService的addCredits方法如下:
@Override @Transactional(value = "fproxy") public synchronized void addCredits(User user, int number) { long tid = Thread.currentThread().getId(); user = userDao.getUserById(user.getId()); logger.debug("tid {}: 重新获取用户信息, credits={}", tid, user.getCredits()); try { Thread.sleep(500); } catch (Exception e) { // TODO: handle exception } user.setCredits(user.getCredits() + number); userDao.update(user); logger.debug("tid {}: 更新用户信息, credits={}", tid, user.getCredits()); }
为了更明显的检验多线程是否有同步串行执行这个addCredits方法,我特意加了一句sleep 500毫秒。检查输出:
DEBUG 2016-10-24 21:43:37,120: com.funway.fproxy.service.UserServiceTest (testConcurrency,112) – 主线程: 休眠等待10s
DEBUG 2016-10-24 21:43:37,338: com.funway.fproxy.service.UserServiceTest (run,98) – tid 19: funway has credits 34
DEBUG 2016-10-24 21:43:37,338: com.funway.fproxy.service.UserServiceTest (run,98) – tid 18: funway has credits 34
DEBUG 2016-10-24 21:43:37,358: com.funway.fproxy.service.impl.UserServiceImpl (addCredits,324) – tid 18: 重新获取用户信息, credits=34
DEBUG 2016-10-24 21:43:37,866: com.funway.fproxy.service.impl.UserServiceImpl (addCredits,335) – tid 18: 更新用户信息, credits=35
DEBUG 2016-10-24 21:43:37,868: com.funway.fproxy.service.impl.UserServiceImpl (addCredits,324) – tid 19: 重新获取用户信息, credits=34
DEBUG 2016-10-24 21:43:37,892: com.funway.fproxy.service.UserServiceTest (run,102) – tid 18: end.
DEBUG 2016-10-24 21:43:38,372: com.funway.fproxy.service.impl.UserServiceImpl (addCredits,335) – tid 19: 更新用户信息, credits=35
DEBUG 2016-10-24 21:43:38,386: com.funway.fproxy.service.UserServiceTest (run,102) – tid 19: end.
DEBUG 2016-10-24 21:43:47,125: com.funway.fproxy.service.UserServiceTest (testConcurrency,116) – 主线程: funway has credits 35
DEBUG 2016-10-24 21:43:47,125: com.funway.fproxy.service.UserServiceTest (testConcurrency,117) – 主线程: 程序运行结束
可以看到,两个线程对addCredits方法的调用确实是串行地执行了,但结果却还是不对?why???
在log4j.xml配置文件中设置“org.hibernate”的等级为debug,然后重新运行上面代码,就会发现实际上线程tid=18的update语句并没有被立即执行,直到线程tid=19重新获取用户信息之后,才真正将先前的update记录写入数据库。这根本的原因是由于hibernate的缓存机制。update操作被暂存在当前session的缓存中,而由于我们的addCredits方法使用了声明式事务(@Transactional注解),其session的创建、刷新以及事务的提交都被Spring框架代理了。
在上面的问题中,就是因为刷新操作flush并没有在addCredits方法中被显示调用,而是在addCredits方法结束后由Spring框架自动代理的。那么当线程tid=18执行完addCredits方法,还没flush到数据库之前,线程tid=19就抢占先机从数据库取了一遍旧数据,才导致了上面的问题。
既然知道了问题所在,修改addCredits方法,在线程结束前手动flush即可:
@Override @Transactional(value = "fproxy") public synchronized void addCredits(User user, int number) { long tid = Thread.currentThread().getId(); user = userDao.getUserById(user.getId()); logger.debug("tid {}: 重新获取用户信息, credits={}", tid, user.getCredits()); try { Thread.sleep(500); } catch (Exception e) { // TODO: handle exception } user.setCredits(user.getCredits() + number); userDao.update(user); // 手动flush sessionFactory.getCurrentSession().flush(); logger.debug("tid {}: 更新用户信息, credits={}", tid, user.getCredits()); }
检查程序输出,这回完全对了:
DEBUG 2016-10-25 01:29:50,878: com.funway.fproxy.service.UserServiceTest (testConcurrency,118) – 主线程: 休眠等待10s
DEBUG 2016-10-25 01:29:51,067: com.funway.fproxy.service.UserServiceTest (run,104) – tid 18: funway has credits 44
DEBUG 2016-10-25 01:29:51,067: com.funway.fproxy.service.UserServiceTest (run,104) – tid 19: funway has credits 44
DEBUG 2016-10-25 01:29:51,083: com.funway.fproxy.service.impl.UserServiceImpl (addCredits,324) – tid 19: 重新获取用户信息, credits=44
DEBUG 2016-10-25 01:29:51,606: com.funway.fproxy.service.impl.UserServiceImpl (addCredits,335) – tid 19: 更新用户信息, credits=45
DEBUG 2016-10-25 01:29:51,609: com.funway.fproxy.service.impl.UserServiceImpl (addCredits,324) – tid 18: 重新获取用户信息, credits=45
DEBUG 2016-10-25 01:29:51,618: com.funway.fproxy.service.UserServiceTest (run,108) – tid 19: end.
DEBUG 2016-10-25 01:29:52,116: com.funway.fproxy.service.impl.UserServiceImpl (addCredits,335) – tid 18: 更新用户信息, credits=46
DEBUG 2016-10-25 01:29:52,127: com.funway.fproxy.service.UserServiceTest (run,108) – tid 18: end.
DEBUG 2016-10-25 01:30:00,888: com.funway.fproxy.service.UserServiceTest (testConcurrency,122) – 主线程: funway has credits 46
DEBUG 2016-10-25 01:30:00,888: com.funway.fproxy.service.UserServiceTest (testConcurrency,123) – 主线程: 程序运行结束
4. 采用事务控制
4.1 方案3 – Isolation.SERIALIZABLE
除了使用代码上手动并发控制(synchronized)之外,我在想是否可以依靠数据库事务的属性来进行并发控制?因为印象中,经常听到关于数据库事务的隔离级别:READ_UNCOMMITTED、READ_COMMITTED、REPEATABLE_READ、SERIALIZABLE。我就认为是不是只要给我们的addCredits方法设置为SERIALIZABLE事务即可?(默认的事务隔离级别等于实际数据库的事务隔离级别)修改Service层的addCredits方法,删除synchronized关键字,添加SERIALIZABLE隔离属性:
@Override @Transactional(value = "fproxy", isolation=Isolation.SERIALIZABLE) public void addCredits(User user, int number) { long tid = Thread.currentThread().getId(); // user = userDao.getUserById(user.getId()); user = (User) sessionFactory.getCurrentSession().get(User.class, user.getId()); logger.debug("tid {}: 重新获取用户信息, credits={}", tid, user.getCredits()); try { Thread.sleep(500); } catch (Exception e) { // TODO: handle exception } user.setCredits(user.getCredits() + number); // userDao.update(user); sessionFactory.getCurrentSession().update(user); // 手动flush sessionFactory.getCurrentSession().flush(); logger.debug("tid {}: 更新用户信息, credits={}", tid, user.getCredits()); }
PS:上面的代码中,我注释了两行代码,userDao.getUserById()与userDao.update(),而直接采用getCurrentSession()获取当前会话并执行get与update操作。具体原因在后面解释。
执行这段代码,发现结果居然报错:
DEBUG main 2016-10-25 21:37:09,881: com.funway.fproxy.service.UserServiceTest (testConcurrency,118) – 主线程: 休眠等待10s
DEBUG pool-1-thread-1 2016-10-25 21:37:10,114: com.funway.fproxy.service.UserServiceTest (run,104) – tid 18: funway has credits 11
DEBUG pool-1-thread-2 2016-10-25 21:37:10,115: com.funway.fproxy.service.UserServiceTest (run,104) – tid 19: funway has credits 11
DEBUG pool-1-thread-1 2016-10-25 21:37:10,133: com.funway.fproxy.service.impl.UserServiceImpl (addCredits,327) – tid 18: 重新获取用户信息, credits=11
DEBUG pool-1-thread-2 2016-10-25 21:37:10,133: com.funway.fproxy.service.impl.UserServiceImpl (addCredits,327) – tid 19: 重新获取用户信息, credits=11
DEBUG pool-1-thread-1 2016-10-25 21:37:10,658: com.funway.fproxy.service.impl.UserServiceImpl (addCredits,341) – tid 18: 更新用户信息, credits=12
WARN pool-1-thread-2 2016-10-25 21:37:10,660: org.hibernate.engine.jdbc.spi.SqlExceptionHelper (logExceptions,144) – SQL Error: 1213, SQLState: 40001
ERROR pool-1-thread-2 2016-10-25 21:37:10,660: org.hibernate.engine.jdbc.spi.SqlExceptionHelper (logExceptions,146) – Deadlock found when trying to get lock; try restarting transaction
Exception in thread “pool-1-thread-2” org.hibernate.exception.LockAcquisitionException: could not execute statement
… …
DEBUG pool-1-thread-1 2016-10-25 21:37:10,669: com.funway.fproxy.service.UserServiceTest (run,108) – tid 18: end.
DEBUG main 2016-10-25 21:37:19,891: com.funway.fproxy.service.UserServiceTest (testConcurrency,122) – 主线程: funway has credits 12
DEBUG main 2016-10-25 21:37:19,891: com.funway.fproxy.service.UserServiceTest (testConcurrency,123) – 主线程: 程序运行结束
线程18的事务正确执行了,而线程19的事务在更新用户信息的时候抛出异常,原因是发现死锁。
我一开始天真的以为Isolation.SERIALIZABLE会让这两个并发事务串行执行,但经过多番搜索答案,我才发现Isolation.SERIALIZABLE只是“可串行化”而不是“串行执行”。好多垃圾博客都将它直接翻译为串行化了,把最重要的able去掉了=。=# 简直智障。SERIALIZABLE可以理解为“如果两个并发事务都正在结束,那么最后的结果看起来就像两个事务串行执行一样。不过,它并不保证两个事务执行的先后顺序,也不保证事务都能正常结束”。SERIALIZABLE通过给每个读写操作加锁来保证事务的并发控制,当系统探测到几个并发事务有“写冲突”的时候,只有一个事务可以正常提交,其他事务均报错。
对此我特意用mysql命令行做了一个实验:
4.2 方案4 – 悲观锁
显然,依靠SERIALIZABLE隔离级别无法实现我们的目标。要真正搞懂事务的隔离性,必须得结合数据库的加锁机制来看,更多的关于数据库加锁机制,不在本文的讨论范围(当然我现在也没搞懂,没好好学数据库=。=)。只是在搜索答案的时候发现有人提到乐观锁悲观锁的概念,便尝试在上面的代码中给读取用户信息加悲观锁。
@Override @Transactional(value = "fproxy", isolation=Isolation.SERIALIZABLE) public void addCredits(User user, int number) { long tid = Thread.currentThread().getId(); user = (User) sessionFactory.getCurrentSession().get(User.class, user.getId(), LockOptions.UPGRADE); logger.debug("tid {}: 重新获取用户信息, credits={}", tid, user.getCredits()); try { Thread.sleep(500); } catch (Exception e) { // TODO: handle exception } user.setCredits(user.getCredits() + number); sessionFactory.getCurrentSession().update(user); // 手动flush // 用数据库加锁的方式,其实就不需要在这里手动flush了, // 因为锁要等待commit之后才会释放,spring的事务代理会在commit之前自动flush。 sessionFactory.getCurrentSession().flush(); logger.debug("tid {}: 更新用户信息, credits={}", tid, user.getCredits()); }
上面代码中,调用session的get()方法时的第三个参数LockOptions.UPGRADE就表示使用悲观锁读取数据。翻译成sql语句就是”select * from fp_user where id = :id for update“。上面这段代码的执行步骤如下表所示:
PS: 既然手工强制读数据时使用悲观锁,那么Isolation.SERIALIZABLE根本就不需要了,不管什么隔离级别,上述代码都能正常运行。
4.3 方案5 – 乐观锁
。。。暂时不研究了,乐观锁与悲观锁的区别自己百度。
4.4 getCurrentSession与openSession的区别
从4.1开始我将userDao.getUserById()注释,替换成了getCurrentSession().get(),原因的话,先看一下我的getUserById()源码:
public User getUserById(long id) { Session session = sessionFactory.openSession(); try { User user = (User) session.get(User.class, id); return user; } finally { session.close(); } }
在这段代码中,获取session的方法是openSession()而不是getCurrentSession()。getCurrentSession表示获取当前线程已打开的session,不存在则打开新session;而openSession则表示重新打开一个session。而不同的session,他们的事务必然不同,因为事务是依赖于session的(显而易见,你想啊在sql命令行时,你要先登录sql打开一个会话,然后才能start transaction并commit),而且这个新的session甚至没有启动事务(相当于每一句sql都是一个独立的事务)。这样,即使在getUserById()方法中使用悲观锁来get,也不会得到想要的结果。因为线程1一get完,session就关闭,锁就释放了,那么线程2就可以get了,这里get操作与addCredits()并不在同一个事务中。
5. 总结
综上所述,能够正确处理1中所描述的并发问题的,只有方案2(使用synchronized关键字,在程序中加锁)与方案4(在读数据库时使用悲观锁)。
出于让代码更加可控的目的,小项目中可以采用synchronized关键字的方案。小项目是指单一Web服务器,单个java进程的情况。synchronized足够能很好的控制多线程并发。
但如果在大项目中,有多台Web服务器同时服务,那么方案2在代码中加锁就还是会出现原来的问题,因为服务器1中的代码无法隔离服务器2中的代码,这时候就需重新审视方案4中的数据库锁,使用数据库加锁的方式进行隔离。
ps:@2016-11-09,今天听猫哥指导了一个数据库加锁的方法,可以把锁加在一张专门的表上,比如说叫lck表(lock是保留字当然不能用)。然后在需要加锁的时候,select * from lck where id=0 for update,这样就能避免锁住业务数据表,防止其他只读操作无法读取当前数据。
6. php怎么处理数据库并发操作
6.1 java与php在处理并发请求的区别
上面的例子都是建立在java编程环境中的,而且是多线程并发。那对于php的web应用,又该如何处理这种数据库并发操作呢?
首先,我觉得需要搞清楚java与php在处理并发请求时候的区别:java是单进程、多线程,php是多进程。以spring mvc框架的java应用程序为例,用户请求到达nginx,nginx转发给tomcat(单进程),tomcat作为一个servlet容器在收到用户请求后,将请求转发给对应的context容器处理,spring mvc就是一个context容器,然后spring将启动一个线程处理该请求(多线程)。对于php应用程序,用户请求到达nginx,然后nginx将请求转发给一个空闲的php-fpm进程(多进程)来执行php程序。
所以,php没有synchronized这种多线程并发控制的方式来处理数据库并发操作。只能考虑如下两种方式:
- 加锁(悲观锁或者乐观锁)
- 使用任务队列,将这些数据库操作写入队列,然后单独启一个作业串行执行
6.2 php使用悲观锁进行数据库并发处理
参考: