关于 Reactor API,如何写异常后恢复

查看 25|回复 0
作者:zhady009   
方法client.downloadFile会不断发射FilePart,需要collec写入到文件中即可。
在接收FilePart期间会有网络等其他异常,现在直接用onErrorResume从 offset 开始请求返回新的 Flux 会有一个问题。
第一次异常会进入onErrorResume返回新的 Flux ,由于新的 Flux 没有声明onErrorResume就噶了
我也不可能在新的 Flux 里声明onErrorResume,无限套娃了属于是。
client.downloadFile(fileReferenceId)
    .publishOn(Schedulers.fromExecutor(Executors.newVirtualThreadPerTaskExecutor()))
    .timeout(Duration.ofMinutes(3))
    .onErrorResume(RpcException::class.java) {
        if (it.error.errorCode() == TIMEOUT_CODE && monitoredChannel.isDone().not()) {
            log.warn("Download timeout, resuming: $fileDownloadPath")
            return@onErrorResume client.downloadFile(
                fileReferenceId,
                monitoredChannel.getDownloadedBytes(),
                MAX_FILE_PART_SIZE,
                true
            )
        }
        Flux.error(it)
    }
    .collect({ monitoredChannel }, { fc, filePart ->
        fc.write(filePart.bytes.nioBuffer())
    })
    .doOnSuccess {
        tempDownloadPath.moveTo(fileDownloadPath)
        downloadCounting.incrementAndGet()
        log.info("Downloaded file: $fileDownloadPath")
    }
    .doOnError {
        log.error("Error downloading file:$fileDownloadPath", it)
    }
    .onErrorMap {
        wrapRetryableExceptionIfNeeded(it)
    }
    .doFinally {
        runCatching {
            closePath(fileDownloadPath)
        }.onFailure {
            log.error("Error closing file channel", it)
        }
        hashingPathMapping.remove(hashing)
    }
    .block()
您需要登录后才可以回帖 登录 | 立即注册

返回顶部