Consumer使用实例
kafka-console-consumer
sh kafka-console-consumer.sh ----bootstrap-server localhost:9092 --topic test --from-beginning
consumer client
Properties props = new Properties();
props.put("bootstrap.servers","localhost:9092"); props.put("group.id","test_group_id");
props.put("enable.auto.commit","true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
while(true){
ConsumerRecords<String,String> records = consumer.poll(1000)
for(ConsumerRecord<String, String> record : records){
System.out.printf("offset=%d,key=%s,value=%s",record.offset(),record.key(),record.value());
}
}
可以看到consumer的入口在poll方法,下面来看下poll方法的实现
Consumer poll模型
//timeout是Consumer消费的超时时间,如果设置为0,表示buffer中只要有数据就立刻拉取
public ConsumerRecords<K, V> poll(long timeout) {
acquire();
try {
if (timeout < 0)
throw new IllegalArgumentException("Timeout must not be negative");
if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
// poll for new data until the timeout expires
long start = time.milliseconds();
long remaining = timeout;
do {
//从订阅的partition中消费数据,pollonce是其核心实现
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
if (!records.isEmpty()) {
// 在返回获取数据之前,需要发起下次的fetch请求,主要是为了避免用户在处理获取数据,而导致fetch请求被阻塞
if (fetcher.sendFetches() > 0 || client.pendingRequestCount() > 0)
client.pollNoWakeup();
if (this.interceptors == null)
return new ConsumerRecords<>(records);
else
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
long elapsed = time.milliseconds() - start;
remaining = timeout - elapsed;
} while (remaining > 0);
return ConsumerRecords.empty();
} finally {
release();
}
}
Consumer的poll方法主要在做以下几件事:
- 检测timeout是否合法以及Consumer是否订阅了相应的topic-partition
- 调用pollOnce方法获取数据
- 在返回结果前,提前发起下次的fetch请求,避免用户在处理返回数据时,而导致线程被阻塞
- 如果在timeout的时间中没有获取到数据,则返回空数据
pollOnce方法
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
coordinator.poll(time.milliseconds());
// 确认是否所有的分区的offset是否有效,更新没有生效的partition的offset
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
// 如果获取到数据,则立马返回
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty())
return records;
// 对于新的fetch请求,立即发起请求
fetcher.sendFetches();
long now = time.milliseconds();
long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
//调用底层的poll方法,发起请求
client.poll(pollTimeout, now, new PollCondition() {
@Override
public boolean shouldBlock() {
// 对于已完成的fetch请求,则不进行阻塞
return !fetcher.hasCompletedFetches();
}
});
// 如果消费组group需要进行负责均衡rebalance,则直接返回空数据,
if (coordinator.needRejoin())
return Collections.emptyMap();
return fetcher.fetchedRecords();
}
pollOnce方法,主要有以下几个步骤:
- coordinator.poll()
- updateFetchPositions()
- fetcher.fetchedRecords()
- fetcher.sendFetches()
- client.poll()
- coordinator.needRejoin()
下面详细分析以上几个步骤
ConsumerCoordinator.poll()
//确保这个group的coordinator是已知的,并且已经Consumer已经加入到这个group中
public void poll(long now) {
invokeCompletedOffsetCommitCallbacks();
//若订阅了topic,并且该coordinator是未知的,则初始化coordinator
if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {
ensureCoordinatorReady();
now = time.milliseconds();
}
//Consumer是否需要重新加入到group中(如果partition发生变化,则需要rejoin)
if (needRejoin()) {
// due to a race condition between the initial metadata fetch and the initial rebalance,
// we need to ensure that the metadata is fresh before joining initially. This ensures
// that we have matched the pattern against the cluster's topics at least once before joining.
if (subscriptions.hasPatternSubscription())
client.ensureFreshMetadata();
// 确保group是active的
ensureActiveGroup();
now = time.milliseconds();
}
//检测心跳线程是否正常,若不正常,则抛出异常
pollHeartbeat(now);
//开启auto commit时,当定时时间到时则自动提交
maybeAutoCommitOffsetsAsync(now);
}
updateFetchPositions()
//如果有committed position,则将fetch position设置为committed position,否则使用配置的重置策略去设置offset
private void updateFetchPositions(Set<TopicPartition> partitions) {
//先重置那些需要重置的partition,比如调用了seekToBeginning,seekToEnd的partition
fetcher.resetOffsetsIfNeeded(partitions);
if (!subscriptions.hasAllFetchPositions(partitions)) {
// if we still don't have offsets for the given partitions, then we should either
// seek to the last committed position or reset using the auto reset policy
// first refresh commits for all assigned partitions
coordinator.refreshCommittedOffsetsIfNeeded();
// then do any offset lookups in case some positions are not known
fetcher.updateFetchPositions(partitions);
}
}
fetcher.fetchedRecords()
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
if (nextInLineExceptionMetadata != null) {
ExceptionMetadata exceptionMetadata = nextInLineExceptionMetadata;
nextInLineExceptionMetadata = null;
TopicPartition tp = exceptionMetadata.partition;
if (subscriptions.isFetchable(tp) && subscriptions.position(tp) == exceptionMetadata.fetchedOffset)
throw exceptionMetadata.exception;
}
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
int recordsRemaining = maxPollRecords;
while (recordsRemaining > 0) {
if (nextInLineRecords == null || nextInLineRecords.isDrained()) {
CompletedFetch completedFetch = completedFetches.poll();
if (completedFetch == null) break;
try {
nextInLineRecords = parseCompletedFetch(completedFetch);
} catch (KafkaException e) {
if (drained.isEmpty())
throw e;
nextInLineExceptionMetadata = new ExceptionMetadata(completedFetch.partition, completedFetch.fetchedOffset, e);
}
} else {
TopicPartition partition = nextInLineRecords.partition;
List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords, recordsRemaining);
if (!records.isEmpty()) {
List<ConsumerRecord<K, V>> currentRecords = drained.get(partition);
if (currentRecords == null) {
drained.put(partition, records);
} else {
// this case shouldn't usually happen because we only send one fetch at a time per partition,
// but it might conceivably happen in some rare cases (such as partition leader changes).
// we have to copy to a new list because the old one may be immutable
List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
drained.put(partition, newRecords);
}
recordsRemaining -= records.size();
}
}
}
return drained;
}
fetcher.sendFetches()
//向订阅的所有的partition所在leader发送fetch请求
public int sendFetches() {
//构建fetch请求
Map<Node, FetchRequest.Builder> fetchRequestMap = createFetchRequests();
for (Map.Entry<Node, FetchRequest.Builder> fetchEntry : fetchRequestMap.entrySet()) {
final FetchRequest.Builder request = fetchEntry.getValue();
final Node fetchTarget = fetchEntry.getKey();
log.debug("Sending fetch for partitions {} to broker {}", request.fetchData().keySet(), fetchTarget);
//发起fetch请求
client.send(fetchTarget, request)
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) {
FetchResponse response = (FetchResponse) resp.responseBody();
if (!matchesRequestedPartitions(request, response)) {
// obviously we expect the broker to always send us valid responses, so this check
// is mainly for test cases where mock fetch responses must be manually crafted.
log.warn("Ignoring fetch response containing partitions {} since it does not match " +
"the requested partitions {}", response.responseData().keySet(),
request.fetchData().keySet());
return;
}
Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
long fetchOffset = request.fetchData().get(partition).offset;
FetchResponse.PartitionData fetchData = entry.getValue();
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
request.version()));
}
sensors.fetchLatency.record(resp.requestLatencyMs());
sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
}
@Override
public void onFailure(RuntimeException e) {
log.debug("Fetch request to {} for partitions {} failed", fetchTarget, request.fetchData().keySet(), e);
}
});
}
return fetchRequestMap.size();
}