更多博客请见 我的语雀知识库
省流:推荐使用带回调方法的异步发送

发后即忘

这种模式只管往broker里发数据,但是broker收没收到它不管。 通常情况下,这种发后即忘(fire-and-forget)的方式并不会出现问题,但也有意外情况:

  1. 遇到可重试异常,如果发送消息到broker时抛出异常,且是可重试异常,就会去重试执行方法。org.apache.kafka.clients.producer.internals.Sender类中有如下方法:
    private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
     return batch.attempts() < this.retries &&
             ((response.error.exception() instanceof RetriableException) ||
                     (transactionManager != null && transactionManager.canRetry(response, batch)));
    }
    

    允许重试需要满足两个条件:

  2. 重试次数少于参数retries指定的值;
  3. 异常是RetriableException类型或者TransactionManager允许重试;

    同步发送

    严格上来说,Kafka并没有同步发送机制。因为发送消息的send方法本身是异步的。 但是Kafka也有提供一些机制让我们做到同步发送。 具体怎么做到同步发送呢?

  • send方法是有返回值的,返回值是Future类型,这是一个接口,可以提供异步的执行结果。我们可以使用Future的get方法去等待异步结束。这样就把异步改成了同步。
try {
        // RecordMetadata包含了成功发送到 Kafka Broker 的消息的元数据信息
        Future<RecordMetadata> future = producer.send(record);
        RecordMetadata metadata = future.get();
        System.out.printf("Send Record(key=%s value=%s) " +
                        "meta(Partition=%d, offset=%d) \n",
                record.key(), record.value(),
                metadata.partition(), metadata.offset());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }

  • 如果遇到可重试异常,在超出最大重试次数之前如果能解决,我们的代码就不会捕获到异常,否则就会爆出异常需要捕获
  • 如果是不可重试异常,是肯定要做异常捕获的。做好异常捕获记录详细日志信息,日后排查问题也方便

同步发送的可靠性比较高,如果发生异常,可以捕获并处理异常;并不会像 “发后即忘” 的方式直接造成消息丢失。但是,同步发送的方式性能会比较差,需要阻塞等待一条消息发送成功之后才会发送下一条消息。kafka的批消息发送也就成摆设了。

异步发送

producer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (metadata != null ) {
                System.out.printf("Send Record(key=%s value=%s) " +
                                "meta(Partition=%d, offset=%d) \n",
                        record.key(), record.value(),
                        metadata.partition(), metadata.offset());
            }
            if (exception != null) {
                exception.printStackTrace();
            }
        }
    });

异步发送消息,比前两种方法多了一个回调函数的参数 不需要 try/catch,因为异常会在回调函数中处理。 回调方法中的 onCompletion() 方法中的两个参数是互斥的。消息发送成功时,metadata 不为 null 而 exception 为 null,反之则反之。 回调方法的真正优势在于发送数据失败时,可以在数据发送失败时采取一些措施,如抛出异常、记录错误或将失败的数据写入到其他地方进行分析。很显然,使用带回调函数的异步发送方式既高效、又优雅,通常在生产环境中都建议使用这种方式发送消息。

:::warning 注意:回调函数是在生产者的主线程中执行的。必须要确保回调方法的执行速度足够快,绝对不能在回调方法中产生阻塞,否则会减慢生产者的运行速度。 :::