文章目录

准备http客户端同步调用异步调用structured concurrency用例与机制浅析补充说明

用springboot的

kotlin demo,帮助理解structured concurrency简化异步并发调用的机制

准备http客户端

使用同时支持同步和异步调用的java.net.http.HttpClient

@Configuration

class RestTemplateConfig {

@Bean

fun httpClient(): HttpClient {

val client: HttpClient = HttpClient.newBuilder()

.connectTimeout(Duration.ofSeconds(3))

.followRedirects(HttpClient.Redirect.NEVER) // CompletableFuture的默认回调线程池也是这个

.executor(ForkJoinPool.commonPool())

.build()

return client

}

@Bean

@Primary

fun restTemplate(httpClient: HttpClient,

builder: RestTemplateBuilder

): RestTemplate {

// 使用springboot自动初始化的builder,已加载各种RestTemplateRequestCustomizer

return builder.requestFactory(Supplier {

JdkClientHttpRequestFactory(

httpClient

)

}).build()

}

}

同步调用

@AutoConfigureMockMvc

@SpringBootTest

class ApplicationTests {

private val log: Logger = LoggerFactory.getLogger(this::class.java)

@Autowired

private lateinit var httpClient: HttpClient

suspend fun blockingGet(tag: String) = withContext(Dispatchers.IO) {

// 在Dispatchers.IO线程池上执行

log.info("$tag 同步http调用")

var threadName1 = Thread.currentThread().name

val req1 = HttpRequest.newBuilder()

.GET().timeout(Duration.ofSeconds(10))

.uri(URI.create("https://xxx"))

.build()

var resp: HttpResponse = httpClient.send(req1, HttpResponse.BodyHandlers.ofString())

log.info("$tag 等待异步http调用完成: ${resp.body()}")

var threadName2 = Thread.currentThread().name

// 证明IO调用期间,线程被阻塞而没有释放

assertEquals(threadName1, threadName2)

log.info("$tag $threadName1 -> $threadName2")

}

}

异步调用

suspend fun asyncGet(tag: String) = withContext(Dispatchers.IO) {

// 在Dispatchers.IO线程池上执行

log.info("$tag 异步http调用")

var threadName1 = Thread.currentThread().name

val req1 = HttpRequest.newBuilder()

.GET().timeout(Duration.ofSeconds(10))

.uri(URI.create("https://xxx"))

.build()

val future1: CompletableFuture> =

httpClient.sendAsync(req1) { info: ResponseInfo ->

// 接收完response header后,触发这里的逻辑执行;然后再解析body,再完成future1

log.info("$tag Protocol: ${info.version()}, ${Thread.currentThread().name}")

BodySubscribers.ofString(StandardCharsets.UTF_8)

}

var resp: HttpResponse = future1.await()

// 上面调用了suspend方法,于是编译器把下面的代码包装到状态机的某个状态的回调里

var threadName2 = Thread.currentThread().name

// 能够观察到线程切换,证明future1.await()这个suspend之后的代码是异步回调的

log.info("$tag $threadName1 -> $threadName2")

log.info("$tag 等待异步http调用完成: ${resp.body()}")

}

HttpClient的send实质是对sendAsync的封装,是在调用线程上强制等待sendAsync返回的CompletableFuture完成。

structured concurrency用例与机制浅析

@Test

fun test1() {

log.info("main begin: " + Thread.currentThread().toString())

var dispatcher: ExecutorCoroutineDispatcher = ForkJoinPool.commonPool().asCoroutineDispatcher()

var result1 = runBlocking(dispatcher) {

// runBlocking{}内的代码全部在dispatcher提供的线程上执行,launch{}里的coroutine除外

var threadName1 = Thread.currentThread().name

repeat(3) {

launch { blockingGet("blocking request" + it) }

}

repeat(3) {

launch { asyncGet("async request" + it) }

}

// 调用一个suspend方法

delay(2L)

var threadName2 = Thread.currentThread().name

delay(2L)

var threadName3 = Thread.currentThread().name

// delay这个suspend真实引发了线程释放和回调,所以每个delay前后的代码不在同一个线程执行

log.info("block end: $threadName1 -> $threadName2 -> $threadName3")

"block result1"

}

// runBlocking阻塞调用它main线程到这里

log.info(result1)

log.info("main end: " + Thread.currentThread().toString())

}

demo里3个blockingGet+3个asyncGet都是并发执行的。 但是blockingGet内部使用的是阻塞式API,实际长期占用线程来等待IO结果,并没有释放线程资源来达到提升并发能力的目的。 asyncGet内部实际发生了线程的释放和异步回调;kotlin编译器在coroutine内检测到对suspend方法的调用(就是future1.await()调用),就会生成匿名类/状态机,来分割调用前后的执行逻辑;因为这个future1的完成最终是依赖IOCP/ePOll等操作系统IO接口的事件通知,所以在等待过程中不占用/阻塞任何操作系统线程;通过打印的日志能观察到future1.await()调用前后的执行线程是不同的。

runBlocking{}则阻塞调用它的线程,等待runBlocking{}内部所有launch的coroutine完成。

补充说明

阻塞式http调用,实际也是依赖操作系统的IO事件,只是没释放等待前的线程

好文推荐

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: