现状:我的应用如下:
跑一个 Java 应用,消费多个 topic 。创建了好几个 consumer 进行消费,每次 consumer.poll 得到的 records 都通过线程池 ThreadPoolExecutor 处理数据。每个 consumer 对应一个 ThreadPoolExecutor
新需求:现需要新增消费一个 topic ,该 topic 的量比存量的 topic 要远远大于。因此需要控制一下该 topic 每个批次消费到的数量。
发现问题:刚开始我以为只需要配置 max.poll.record 就可以控制每个批次的消费速率,但经过测试发现,由于每次消费到的 records 都让线程池去处理了,因此 consumer.poll 一次数据在一个批次内就识别到很快就处理完,然后 consumer 就会在一个批次内尽可能地去 poll 多几次。这样就没法实现每个批次控制了。
请教大家:针对以上情况有优化的可能性吗?需要尽可能精确控制指定 topic 一个批次内只需要消费固定的数据量,我目前发现 sparkStreaming 倒是很好地控制,但是 Java 目前没找到合适的方案来实现控制。