Kafka源码分析——Consumer

Consumer使用实例

kafka-console-consumer

sh kafka-console-consumer.sh ----bootstrap-server localhost:9092 --topic test --from-beginning

consumer client

Properties props = new Properties();
props.put("bootstrap.servers","localhost:9092");    props.put("group.id","test_group_id");
props.put("enable.auto.commit","true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
while(true){
    ConsumerRecords<String,String> records = consumer.poll(1000)
    for(ConsumerRecord<String, String> record : records){
        System.out.printf("offset=%d,key=%s,value=%s",record.offset(),record.key(),record.value());
    }
}

可以看到consumer的入口在poll方法,下面来看下poll方法的实现

Consumer poll模型

//timeout是Consumer消费的超时时间,如果设置为0,表示buffer中只要有数据就立刻拉取
public ConsumerRecords<K, V> poll(long timeout) {
    acquire();
    try {
        if (timeout < 0)
            throw new IllegalArgumentException("Timeout must not be negative");

        if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
            throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");

        // poll for new data until the timeout expires
        long start = time.milliseconds();
        long remaining = timeout;
        do {
                //从订阅的partition中消费数据,pollonce是其核心实现
            Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
            if (!records.isEmpty()) {
                // 在返回获取数据之前,需要发起下次的fetch请求,主要是为了避免用户在处理获取数据,而导致fetch请求被阻塞
                if (fetcher.sendFetches() > 0 || client.pendingRequestCount() > 0)
                    client.pollNoWakeup();

                if (this.interceptors == null)
                    return new ConsumerRecords<>(records);
                else
                    return this.interceptors.onConsume(new ConsumerRecords<>(records));
            }

            long elapsed = time.milliseconds() - start;
            remaining = timeout - elapsed;
        } while (remaining > 0);

        return ConsumerRecords.empty();
    } finally {
        release();
    }
}

Consumer的poll方法主要在做以下几件事:

  1. 检测timeout是否合法以及Consumer是否订阅了相应的topic-partition
  2. 调用pollOnce方法获取数据
  3. 在返回结果前,提前发起下次的fetch请求,避免用户在处理返回数据时,而导致线程被阻塞
  4. 如果在timeout的时间中没有获取到数据,则返回空数据

pollOnce方法

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
    coordinator.poll(time.milliseconds());

    // 确认是否所有的分区的offset是否有效,更新没有生效的partition的offset
    if (!subscriptions.hasAllFetchPositions())
        updateFetchPositions(this.subscriptions.missingFetchPositions());

    // 如果获取到数据,则立马返回
    Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
    if (!records.isEmpty())
        return records;

    // 对于新的fetch请求,立即发起请求
    fetcher.sendFetches();

    long now = time.milliseconds();
    long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);

     //调用底层的poll方法,发起请求
    client.poll(pollTimeout, now, new PollCondition() {
        @Override
        public boolean shouldBlock() {
            // 对于已完成的fetch请求,则不进行阻塞
            return !fetcher.hasCompletedFetches();
        }
    });

    // 如果消费组group需要进行负责均衡rebalance,则直接返回空数据,
    if (coordinator.needRejoin())
        return Collections.emptyMap();

    return fetcher.fetchedRecords();
}    

pollOnce方法,主要有以下几个步骤:

  1. coordinator.poll()
  2. updateFetchPositions()
  3. fetcher.fetchedRecords()
  4. fetcher.sendFetches()
  5. client.poll()
  6. coordinator.needRejoin()

下面详细分析以上几个步骤

ConsumerCoordinator.poll()

//确保这个group的coordinator是已知的,并且已经Consumer已经加入到这个group中
public void poll(long now) {
    invokeCompletedOffsetCommitCallbacks();

      //若订阅了topic,并且该coordinator是未知的,则初始化coordinator
    if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {
        ensureCoordinatorReady();
        now = time.milliseconds();
    }

     //Consumer是否需要重新加入到group中(如果partition发生变化,则需要rejoin)
    if (needRejoin()) {
        // due to a race condition between the initial metadata fetch and the initial rebalance,
        // we need to ensure that the metadata is fresh before joining initially. This ensures
        // that we have matched the pattern against the cluster's topics at least once before joining.
        if (subscriptions.hasPatternSubscription())
            client.ensureFreshMetadata();
          // 确保group是active的
        ensureActiveGroup();
        now = time.milliseconds();
    }

     //检测心跳线程是否正常,若不正常,则抛出异常
    pollHeartbeat(now);

    //开启auto commit时,当定时时间到时则自动提交
    maybeAutoCommitOffsetsAsync(now);
}

updateFetchPositions()

//如果有committed position,则将fetch position设置为committed position,否则使用配置的重置策略去设置offset
private void updateFetchPositions(Set<TopicPartition> partitions) {
    //先重置那些需要重置的partition,比如调用了seekToBeginning,seekToEnd的partition
    fetcher.resetOffsetsIfNeeded(partitions);

    if (!subscriptions.hasAllFetchPositions(partitions)) {
        // if we still don't have offsets for the given partitions, then we should either
        // seek to the last committed position or reset using the auto reset policy

        // first refresh commits for all assigned partitions
        coordinator.refreshCommittedOffsetsIfNeeded();

        // then do any offset lookups in case some positions are not known
        fetcher.updateFetchPositions(partitions);
    }
}   

fetcher.fetchedRecords()

public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
    if (nextInLineExceptionMetadata != null) {
        ExceptionMetadata exceptionMetadata = nextInLineExceptionMetadata;
        nextInLineExceptionMetadata = null;
        TopicPartition tp = exceptionMetadata.partition;
        if (subscriptions.isFetchable(tp) && subscriptions.position(tp) == exceptionMetadata.fetchedOffset)
            throw exceptionMetadata.exception;
    }

    Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
    int recordsRemaining = maxPollRecords;
    while (recordsRemaining > 0) {
        if (nextInLineRecords == null || nextInLineRecords.isDrained()) {
            CompletedFetch completedFetch = completedFetches.poll();
            if (completedFetch == null) break;
            try {
                nextInLineRecords = parseCompletedFetch(completedFetch);
            } catch (KafkaException e) {
                if (drained.isEmpty())
                    throw e;
                nextInLineExceptionMetadata = new ExceptionMetadata(completedFetch.partition, completedFetch.fetchedOffset, e);
            }
        } else {
            TopicPartition partition = nextInLineRecords.partition;
            List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords, recordsRemaining);
            if (!records.isEmpty()) {
                List<ConsumerRecord<K, V>> currentRecords = drained.get(partition);
                if (currentRecords == null) {
                    drained.put(partition, records);
                } else {
                    // this case shouldn't usually happen because we only send one fetch at a time per partition,
                    // but it might conceivably happen in some rare cases (such as partition leader changes).
                    // we have to copy to a new list because the old one may be immutable
                    List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
                    newRecords.addAll(currentRecords);
                    newRecords.addAll(records);
                    drained.put(partition, newRecords);
                }
                recordsRemaining -= records.size();
            }
        }
    }


    return drained;
}

fetcher.sendFetches()

//向订阅的所有的partition所在leader发送fetch请求
public int sendFetches() {
     //构建fetch请求
    Map<Node, FetchRequest.Builder> fetchRequestMap = createFetchRequests();
    for (Map.Entry<Node, FetchRequest.Builder> fetchEntry : fetchRequestMap.entrySet()) {
        final FetchRequest.Builder request = fetchEntry.getValue();
        final Node fetchTarget = fetchEntry.getKey();

        log.debug("Sending fetch for partitions {} to broker {}", request.fetchData().keySet(), fetchTarget);
        //发起fetch请求
        client.send(fetchTarget, request)
                .addListener(new RequestFutureListener<ClientResponse>() {
                    @Override
                    public void onSuccess(ClientResponse resp) {
                        FetchResponse response = (FetchResponse) resp.responseBody();
                        if (!matchesRequestedPartitions(request, response)) {
                            // obviously we expect the broker to always send us valid responses, so this check
                            // is mainly for test cases where mock fetch responses must be manually crafted.
                            log.warn("Ignoring fetch response containing partitions {} since it does not match " +
                                    "the requested partitions {}", response.responseData().keySet(),
                                    request.fetchData().keySet());
                            return;
                        }

                        Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
                        FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);

                        for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                            TopicPartition partition = entry.getKey();
                            long fetchOffset = request.fetchData().get(partition).offset;
                            FetchResponse.PartitionData fetchData = entry.getValue();
                            completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
                                    request.version()));
                        }

                        sensors.fetchLatency.record(resp.requestLatencyMs());
                        sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
                    }

                    @Override
                    public void onFailure(RuntimeException e) {
                        log.debug("Fetch request to {} for partitions {} failed", fetchTarget, request.fetchData().keySet(), e);
                    }
                });
    }
    return fetchRequestMap.size();
}    

参考文献


Reprint please specify: wbl Kafka源码分析——Consumer

Previous
Prometheus简介 Prometheus简介
Prometheus简介Prometheus 是一套开源的系统监控报警框架。作为新一代的监控框架,Prometheus具有如下几个特点 多维度的数据模型 灵活和强大的查询语句(PromQL) 易于管理,prometheus是一个独立的二进
2019-01-16
Next
Kafka源码分析——Producer Kafka源码分析——Producer
Producer 使用示例kafka-console-producersh kafka-console-producer.sh --broker-list localhost:9092 --topic test producer clien
2019-01-05