​ 最近在做一个支付业务,考虑做成一个springboot的组件,方便其他业务方调用,在查看银行对接文档的过程中,发现银行服务端对并发有控制,请求的线程数需要按照文档中的要求来,否则超出的线程会被拒绝。

准备

​ 因为以前并没有写过这方面的代码,但是涉及到并发,首先想到的就是juc包,所以就找找下面的类。

#Semaphore

​ 这个是第一个找到的符合的类,意译为信号量,大致可以理解为红绿灯。

Semaphore semaphore = new Semaphore(1); 有一个permits(绿灯)

semaphore.acquire(1); 这个线程获取到了permits(绿灯),才能往下走,并发的其他线程没有拿到permits(红灯),只能等待。

semaphore.release(1); 前面的线程释放permits,红绿灯切换,另一个线程拿到permits(另一个绿灯亮起),往下执行。

semaphore.tryAcquire(); 当前线程尝试获取permits(抬头查看红绿灯,),成功(绿灯),则拿走一个permits,失败(红灯),原地等待。

​ 看了上面的api,其实 semaphore.tryAcquire(); 就已经简单我的需求了,伪代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private final static Semaphore semaphore = new Semaphore(5);

@Test
public void start() {
// 模拟10个并发调用
StressTestUtils.testAndPrint(10, 10, () -> {
concurrent();
return null;
}, 0);
}

public void concurrent() {
if (semaphore.tryAcquire()) {
System.out.println("i get permits");
}
else {
System.out.println("more than concurrent");
}
}

1542109557899

但是这里的实现是类似于线程池的拒绝策略,不够友好,所以改造下,没有拿到permits的等待,而不是拒绝。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private final static Semaphore semaphore = new Semaphore(5);

@Test
public void start() {
// 模拟10个并发调用
StressTestUtils.testAndPrint(10, 10, () -> {
myWait();
return null;
}, 0);
}

public void myWait() throws InterruptedException {
semaphore.acquire(1);
// 业务逻辑开始
System.out.println("i get permits");
Thread.sleep(1000);
System.out.println("i finish");
// 结束,释放permits
semaphore.release(1);
}

1542109840368

​ 超过限定并发数的线程,会等待前面的线程释放后,拿到permits执行。

到这里其实符合我的要求了,所以我就写了一个semaphore的切面,完成了简单的并发数控制,具体代码就不列了。

进阶思考

​ 上面其实只是针对单机,如果是多台提供服务,还是会有问题。

1542111589870

假定采用轮询负载均衡,则每个server均衡获取,这样就会导致超过permits的并发调用,这个时候单机的semaphore就没有用了,通常需要一个第三方的组件来记录。

分布式限流

简单介绍一下,我的想法是通过引入redis来记录同一时间内的请求数,超过并发的请求,执行拒绝策略。

但是这里有一个问题,并发操作redis,如何保证操作是线程安全的?因为如果有竞态条件,就会导致记录值不准确,限制就失效了。

这里有一篇参考文章: 分布式限流

文章里是通过lua脚本去执行redis操作的,如果把多步操作,都放在lua脚本内,一次性通过eval执行,因为redis是单线程执行,巧妙的解决了线程安全的问题,具体实现也可以查看文章内分享的源码。

总结

1.虽然做这个支付组件的时候,因为业务量小,需求并没有要求控制并发,但是通过此次自己学习、实现,算是对绝知此事要躬行的并发真正实践,这里只是选了juc下面的一个并发辅助类semaphore,并且该类是基于AQS实现的,由此引出了自己要去学习AQS的机制,和AQS下的其他类,算是并发入门了。

2.需要有分布式的思考,不能局限在单机,并且思考怎么合理利用第三方组件来实现。