在接收FilePart期间会有网络等其他异常,现在直接用onErrorResume从 offset 开始请求返回新的 Flux 会有一个问题。
第一次异常会进入onErrorResume返回新的 Flux ,由于新的 Flux 没有声明onErrorResume就噶了
我也不可能在新的 Flux 里声明onErrorResume,无限套娃了属于是。
client.downloadFile(fileReferenceId)
.publishOn(Schedulers.fromExecutor(Executors.newVirtualThreadPerTaskExecutor()))
.timeout(Duration.ofMinutes(3))
.onErrorResume(RpcException::class.java) {
if (it.error.errorCode() == TIMEOUT_CODE && monitoredChannel.isDone().not()) {
log.warn("Download timeout, resuming: $fileDownloadPath")
return@onErrorResume client.downloadFile(
fileReferenceId,
monitoredChannel.getDownloadedBytes(),
MAX_FILE_PART_SIZE,
true
)
}
Flux.error(it)
}
.collect({ monitoredChannel }, { fc, filePart ->
fc.write(filePart.bytes.nioBuffer())
})
.doOnSuccess {
tempDownloadPath.moveTo(fileDownloadPath)
downloadCounting.incrementAndGet()
log.info("Downloaded file: $fileDownloadPath")
}
.doOnError {
log.error("Error downloading file:$fileDownloadPath", it)
}
.onErrorMap {
wrapRetryableExceptionIfNeeded(it)
}
.doFinally {
runCatching {
closePath(fileDownloadPath)
}.onFailure {
log.error("Error closing file channel", it)
}
hashingPathMapping.remove(hashing)
}
.block()