kafka StreamsMetricsImpl 源码

  • 2022-10-20
  • 浏览 (247)

kafka StreamsMetricsImpl 代码

文件路径:/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams.processor.internals.metrics;

import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeCount;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;

import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class StreamsMetricsImpl implements StreamsMetrics {

    public enum Version {
        LATEST
    }

    static class ImmutableMetricValue<T> implements Gauge<T> {
        private final T value;

        public ImmutableMetricValue(final T value) {
            this.value = value;
        }

        @Override
        public T value(final MetricConfig config, final long now) {
            return value;
        }

        @Override
        public boolean equals(final Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            final ImmutableMetricValue<?> that = (ImmutableMetricValue<?>) o;
            return Objects.equals(value, that.value);
        }

        @Override
        public int hashCode() {
            return Objects.hash(value);
        }
    }

    private final Metrics metrics;
    private final Map<Sensor, Sensor> parentSensors;
    private final String clientId;

    private final Version version;
    private final Deque<MetricName> clientLevelMetrics = new LinkedList<>();
    private final Deque<String> clientLevelSensors = new LinkedList<>();
    private final Map<String, Deque<MetricName>> threadLevelMetrics = new HashMap<>();
    private final Map<String, Deque<String>> threadLevelSensors = new HashMap<>();
    private final Map<String, Deque<String>> taskLevelSensors = new HashMap<>();
    private final Map<String, Deque<String>> nodeLevelSensors = new HashMap<>();
    private final Map<String, Deque<String>> topicLevelSensors = new HashMap<>();
    private final Map<String, Deque<String>> cacheLevelSensors = new HashMap<>();
    private final ConcurrentMap<String, Deque<String>> storeLevelSensors = new ConcurrentHashMap<>();
    private final ConcurrentMap<String, Deque<MetricName>> storeLevelMetrics = new ConcurrentHashMap<>();

    private final RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger;

    private static final String SENSOR_PREFIX_DELIMITER = ".";
    private static final String SENSOR_NAME_DELIMITER = ".s.";
    private static final String SENSOR_TASK_LABEL = "task";
    private static final String SENSOR_NODE_LABEL = "node";
    private static final String SENSOR_TOPIC_LABEL = "topic";
    private static final String SENSOR_CACHE_LABEL = "cache";
    private static final String SENSOR_STORE_LABEL = "store";
    private static final String SENSOR_ENTITY_LABEL = "entity";
    private static final String SENSOR_EXTERNAL_LABEL = "external";
    private static final String SENSOR_INTERNAL_LABEL = "internal";

    public static final String CLIENT_ID_TAG = "client-id";
    public static final String THREAD_ID_TAG = "thread-id";
    public static final String TASK_ID_TAG = "task-id";
    public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
    public static final String TOPIC_NAME_TAG = "topic";
    public static final String STORE_ID_TAG = "state-id";
    public static final String RECORD_CACHE_ID_TAG = "record-cache-id";

    public static final String ROLLUP_VALUE = "all";

    public static final String LATENCY_SUFFIX = "-latency";
    public static final String RECORDS_SUFFIX = "-records";
    public static final String AVG_SUFFIX = "-avg";
    public static final String MAX_SUFFIX = "-max";
    public static final String MIN_SUFFIX = "-min";
    public static final String RATE_SUFFIX = "-rate";
    public static final String TOTAL_SUFFIX = "-total";
    public static final String RATIO_SUFFIX = "-ratio";

    public static final String GROUP_PREFIX_WO_DELIMITER = "stream";
    public static final String GROUP_PREFIX = GROUP_PREFIX_WO_DELIMITER + "-";
    public static final String GROUP_SUFFIX = "-metrics";
    public static final String CLIENT_LEVEL_GROUP = GROUP_PREFIX_WO_DELIMITER + GROUP_SUFFIX;
    public static final String THREAD_LEVEL_GROUP = GROUP_PREFIX + "thread" + GROUP_SUFFIX;
    public static final String TASK_LEVEL_GROUP = GROUP_PREFIX + "task" + GROUP_SUFFIX;
    public static final String PROCESSOR_NODE_LEVEL_GROUP = GROUP_PREFIX + "processor-node" + GROUP_SUFFIX;
    public static final String TOPIC_LEVEL_GROUP = GROUP_PREFIX + "topic" + GROUP_SUFFIX;
    public static final String STATE_STORE_LEVEL_GROUP = GROUP_PREFIX + "state" + GROUP_SUFFIX;
    public static final String CACHE_LEVEL_GROUP = GROUP_PREFIX + "record-cache" + GROUP_SUFFIX;

    public static final String OPERATIONS = " operations";
    public static final String TOTAL_DESCRIPTION = "The total number of ";
    public static final String RATE_DESCRIPTION = "The average per-second number of ";
    public static final String AVG_LATENCY_DESCRIPTION = "The average latency of ";
    public static final String MAX_LATENCY_DESCRIPTION = "The maximum latency of ";
    public static final String RATE_DESCRIPTION_PREFIX = "The average number of ";
    public static final String RATE_DESCRIPTION_SUFFIX = " per second";

    public static final String RECORD_E2E_LATENCY = "record-e2e-latency";
    public static final String RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX =
        "end-to-end latency of a record, measuring by comparing the record timestamp with the "
            + "system time when it has been fully processed by the node";
    public static final String RECORD_E2E_LATENCY_AVG_DESCRIPTION = "The average " + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX;
    public static final String RECORD_E2E_LATENCY_MIN_DESCRIPTION = "The minimum " + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX;
    public static final String RECORD_E2E_LATENCY_MAX_DESCRIPTION = "The maximum " + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX;

    public StreamsMetricsImpl(final Metrics metrics,
                              final String clientId,
                              final String builtInMetricsVersion,
                              final Time time) {
        Objects.requireNonNull(metrics, "Metrics cannot be null");
        Objects.requireNonNull(builtInMetricsVersion, "Built-in metrics version cannot be null");
        this.metrics = metrics;
        this.clientId = clientId;
        version = Version.LATEST;
        rocksDBMetricsRecordingTrigger = new RocksDBMetricsRecordingTrigger(time);

        this.parentSensors = new HashMap<>();
    }

    public Version version() {
        return version;
    }

    public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() {
        return rocksDBMetricsRecordingTrigger;
    }

    public <T> void addClientLevelImmutableMetric(final String name,
                                                  final String description,
                                                  final RecordingLevel recordingLevel,
                                                  final T value) {
        final MetricName metricName = metrics.metricName(name, CLIENT_LEVEL_GROUP, description, clientLevelTagMap());
        final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel);
        synchronized (clientLevelMetrics) {
            metrics.addMetric(metricName, metricConfig, new ImmutableMetricValue<>(value));
            clientLevelMetrics.push(metricName);
        }
    }

    public <T> void addClientLevelMutableMetric(final String name,
                                                final String description,
                                                final RecordingLevel recordingLevel,
                                                final Gauge<T> valueProvider) {
        final MetricName metricName = metrics.metricName(name, CLIENT_LEVEL_GROUP, description, clientLevelTagMap());
        final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel);
        synchronized (clientLevelMetrics) {
            metrics.addMetric(metricName, metricConfig, valueProvider);
            clientLevelMetrics.push(metricName);
        }
    }

    public <T> void addThreadLevelImmutableMetric(final String name,
        final String description,
        final String threadId,
        final T value) {
        final MetricName metricName = metrics.metricName(
            name, THREAD_LEVEL_GROUP, description, threadLevelTagMap(threadId));
        synchronized (threadLevelMetrics) {
            threadLevelMetrics.computeIfAbsent(
                threadSensorPrefix(threadId),
                tid -> new LinkedList<>()
            ).add(metricName);
            metrics.addMetric(metricName, new ImmutableMetricValue<>(value));
        }
    }

    public <T> void addThreadLevelMutableMetric(final String name,
                                                final String description,
                                                final String threadId,
                                                final Gauge<T> valueProvider) {
        final MetricName metricName = metrics.metricName(
            name, THREAD_LEVEL_GROUP, description, threadLevelTagMap(threadId));
        synchronized (threadLevelMetrics) {
            threadLevelMetrics.computeIfAbsent(
                threadSensorPrefix(threadId),
                tid -> new LinkedList<>()
            ).add(metricName);
            metrics.addMetric(metricName, valueProvider);
        }
    }

    public final Sensor clientLevelSensor(final String sensorName,
                                          final RecordingLevel recordingLevel,
                                          final Sensor... parents) {
        synchronized (clientLevelSensors) {
            final String fullSensorName = CLIENT_LEVEL_GROUP + SENSOR_NAME_DELIMITER + sensorName;
            final Sensor sensor = metrics.getSensor(fullSensorName);
            if (sensor == null) {
                clientLevelSensors.push(fullSensorName);
                return metrics.sensor(fullSensorName, recordingLevel, parents);
            }
            return sensor;
        }
    }

    public final Sensor threadLevelSensor(final String threadId,
                                          final String sensorName,
                                          final RecordingLevel recordingLevel,
                                          final Sensor... parents) {
        final String key = threadSensorPrefix(threadId);
        synchronized (threadLevelSensors) {
            return getSensors(threadLevelSensors, sensorName, key, recordingLevel, parents);
        }
    }

    private String threadSensorPrefix(final String threadId) {
        return SENSOR_INTERNAL_LABEL + SENSOR_PREFIX_DELIMITER + threadId;
    }

    public Map<String, String> clientLevelTagMap() {
        final Map<String, String> tagMap = new LinkedHashMap<>();
        tagMap.put(CLIENT_ID_TAG, clientId);
        return tagMap;
    }

    public Map<String, String> threadLevelTagMap(final String threadId) {
        final Map<String, String> tagMap = new LinkedHashMap<>();
        tagMap.put(THREAD_ID_TAG, threadId);
        return tagMap;
    }

    public final void removeAllClientLevelSensorsAndMetrics() {
        removeAllClientLevelSensors();
        removeAllClientLevelMetrics();
    }

    private void removeAllClientLevelMetrics() {
        synchronized (clientLevelMetrics) {
            while (!clientLevelMetrics.isEmpty()) {
                metrics.removeMetric(clientLevelMetrics.pop());
            }
        }
    }

    private void removeAllClientLevelSensors() {
        synchronized (clientLevelSensors) {
            while (!clientLevelSensors.isEmpty()) {
                metrics.removeSensor(clientLevelSensors.pop());
            }
        }
    }

    public final void removeAllThreadLevelSensors(final String threadId) {
        final String key = threadSensorPrefix(threadId);
        synchronized (threadLevelSensors) {
            final Deque<String> sensors = threadLevelSensors.remove(key);
            while (sensors != null && !sensors.isEmpty()) {
                metrics.removeSensor(sensors.pop());
            }
        }
    }

    public final void removeAllThreadLevelMetrics(final String threadId) {
        synchronized (threadLevelMetrics) {
            final Deque<MetricName> names = threadLevelMetrics.remove(threadSensorPrefix(threadId));
            while (names != null && !names.isEmpty()) {
                metrics.removeMetric(names.pop());
            }
        }
    }

    public Map<String, String> taskLevelTagMap(final String threadId, final String taskId) {
        final Map<String, String> tagMap = threadLevelTagMap(threadId);
        tagMap.put(TASK_ID_TAG, taskId);
        return tagMap;
    }

    public Map<String, String> nodeLevelTagMap(final String threadId,
                                               final String taskName,
                                               final String processorNodeName) {
        final Map<String, String> tagMap = taskLevelTagMap(threadId, taskName);
        tagMap.put(PROCESSOR_NODE_ID_TAG, processorNodeName);
        return tagMap;
    }

    public Map<String, String> topicLevelTagMap(final String threadId,
                                                final String taskName,
                                                final String processorNodeName,
                                                final String topicName) {
        final Map<String, String> tagMap = nodeLevelTagMap(threadId, taskName, processorNodeName);
        tagMap.put(TOPIC_NAME_TAG, topicName);
        return tagMap;
    }

    public Map<String, String> storeLevelTagMap(final String taskName,
                                                final String storeType,
                                                final String storeName) {
        final Map<String, String> tagMap = taskLevelTagMap(Thread.currentThread().getName(), taskName);
        tagMap.put(storeType + "-" + STORE_ID_TAG, storeName);
        return tagMap;
    }

    public final Sensor taskLevelSensor(final String threadId,
                                        final String taskId,
                                        final String sensorName,
                                        final RecordingLevel recordingLevel,
                                        final Sensor... parents) {
        final String key = taskSensorPrefix(threadId, taskId);
        synchronized (taskLevelSensors) {
            return getSensors(taskLevelSensors, sensorName, key, recordingLevel, parents);
        }
    }

    public final void removeAllTaskLevelSensors(final String threadId, final String taskId) {
        final String key = taskSensorPrefix(threadId, taskId);
        synchronized (taskLevelSensors) {
            final Deque<String> sensors = taskLevelSensors.remove(key);
            while (sensors != null && !sensors.isEmpty()) {
                metrics.removeSensor(sensors.pop());
            }
        }
    }

    private String taskSensorPrefix(final String threadId, final String taskId) {
        return threadSensorPrefix(threadId) + SENSOR_PREFIX_DELIMITER + SENSOR_TASK_LABEL + SENSOR_PREFIX_DELIMITER +
            taskId;
    }

    public Sensor nodeLevelSensor(final String threadId,
                                  final String taskId,
                                  final String processorNodeName,
                                  final String sensorName,
                                  final Sensor.RecordingLevel recordingLevel,
                                  final Sensor... parents) {
        final String key = nodeSensorPrefix(threadId, taskId, processorNodeName);
        synchronized (nodeLevelSensors) {
            return getSensors(nodeLevelSensors, sensorName, key, recordingLevel, parents);
        }
    }

    public final void removeAllNodeLevelSensors(final String threadId,
                                                final String taskId,
                                                final String processorNodeName) {
        final String key = nodeSensorPrefix(threadId, taskId, processorNodeName);
        synchronized (nodeLevelSensors) {
            final Deque<String> sensors = nodeLevelSensors.remove(key);
            while (sensors != null && !sensors.isEmpty()) {
                metrics.removeSensor(sensors.pop());
            }
        }
    }

    private String nodeSensorPrefix(final String threadId, final String taskId, final String processorNodeName) {
        return taskSensorPrefix(threadId, taskId)
            + SENSOR_PREFIX_DELIMITER + SENSOR_NODE_LABEL + SENSOR_PREFIX_DELIMITER + processorNodeName;
    }

    public Sensor topicLevelSensor(final String threadId,
                                   final String taskId,
                                   final String processorNodeName,
                                   final String topicName,
                                   final String sensorName,
                                   final Sensor.RecordingLevel recordingLevel,
                                   final Sensor... parents) {
        final String key = topicSensorPrefix(threadId, taskId, processorNodeName, topicName);
        synchronized (topicLevelSensors) {
            return getSensors(topicLevelSensors, sensorName, key, recordingLevel, parents);
        }
    }

    public final void removeAllTopicLevelSensors(final String threadId,
                                                 final String taskId,
                                                 final String processorNodeName,
                                                 final String topicName) {
        final String key = topicSensorPrefix(threadId, taskId, processorNodeName, topicName);
        synchronized (topicLevelSensors) {
            final Deque<String> sensors = topicLevelSensors.remove(key);
            while (sensors != null && !sensors.isEmpty()) {
                metrics.removeSensor(sensors.pop());
            }
        }
    }

    private String topicSensorPrefix(final String threadId,
                                     final String taskId,
                                     final String processorNodeName,
                                     final String topicName) {
        return nodeSensorPrefix(threadId, taskId, processorNodeName)
            + SENSOR_PREFIX_DELIMITER + SENSOR_TOPIC_LABEL + SENSOR_PREFIX_DELIMITER + topicName;
    }

    public Sensor cacheLevelSensor(final String threadId,
                                   final String taskName,
                                   final String storeName,
                                   final String sensorName,
                                   final Sensor.RecordingLevel recordingLevel,
                                   final Sensor... parents) {
        final String key = cacheSensorPrefix(threadId, taskName, storeName);
        synchronized (cacheLevelSensors) {
            return getSensors(cacheLevelSensors, sensorName, key, recordingLevel, parents);
        }
    }

    public Map<String, String> cacheLevelTagMap(final String threadId,
                                                final String taskId,
                                                final String storeName) {
        final Map<String, String> tagMap = new LinkedHashMap<>();
        tagMap.put(THREAD_ID_TAG, threadId);
        tagMap.put(TASK_ID_TAG, taskId);
        tagMap.put(RECORD_CACHE_ID_TAG, storeName);
        return tagMap;
    }

    public final void removeAllCacheLevelSensors(final String threadId, final String taskId, final String cacheName) {
        final String key = cacheSensorPrefix(threadId, taskId, cacheName);
        synchronized (cacheLevelSensors) {
            final Deque<String> strings = cacheLevelSensors.remove(key);
            while (strings != null && !strings.isEmpty()) {
                metrics.removeSensor(strings.pop());
            }
        }
    }

    private String cacheSensorPrefix(final String threadId, final String taskId, final String cacheName) {
        return taskSensorPrefix(threadId, taskId)
            + SENSOR_PREFIX_DELIMITER + SENSOR_CACHE_LABEL + SENSOR_PREFIX_DELIMITER + cacheName;
    }

    public final Sensor storeLevelSensor(final String taskId,
                                         final String storeName,
                                         final String sensorName,
                                         final RecordingLevel recordingLevel,
                                         final Sensor... parents) {
        final String key = storeSensorPrefix(Thread.currentThread().getName(), taskId, storeName);
            // since the keys in the map storeLevelSensors contain the name of the current thread and threads only
            // access keys in which their name is contained, the value in the maps do not need to be thread safe
            // and we can use a LinkedList here.
            // TODO: In future, we could use thread local maps since each thread will exclusively access the set of keys
            //  that contain its name. Similar is true for the other metric levels. Thread-level metrics need some
            //  special attention, since they are created before the thread is constructed. The creation of those
            //  metrics could be moved into the run() method of the thread.
        return getSensors(storeLevelSensors, sensorName, key, recordingLevel, parents);
    }

    public <T> void addStoreLevelMutableMetric(final String taskId,
                                               final String metricsScope,
                                               final String storeName,
                                               final String name,
                                               final String description,
                                               final RecordingLevel recordingLevel,
                                               final Gauge<T> valueProvider) {
        final MetricName metricName = metrics.metricName(
            name,
            STATE_STORE_LEVEL_GROUP,
            description,
            storeLevelTagMap(taskId, metricsScope, storeName)
        );
        if (metrics.metric(metricName) == null) {
            metrics.addMetricIfAbsent(metricName, new MetricConfig().recordLevel(recordingLevel), valueProvider);
            final String key = storeSensorPrefix(Thread.currentThread().getName(), taskId, storeName);
            storeLevelMetrics.computeIfAbsent(key, ignored -> new LinkedList<>()).push(metricName);
        }
    }

    public final void removeAllStoreLevelSensorsAndMetrics(final String taskId,
                                                           final String storeName) {
        final String threadId = Thread.currentThread().getName();
        removeAllStoreLevelSensors(threadId, taskId, storeName);
        removeAllStoreLevelMetrics(threadId, taskId, storeName);
    }

    private void removeAllStoreLevelSensors(final String threadId,
                                            final String taskId,
                                            final String storeName) {
        final String key = storeSensorPrefix(threadId, taskId, storeName);
        final Deque<String> sensors = storeLevelSensors.remove(key);
        while (sensors != null && !sensors.isEmpty()) {
            metrics.removeSensor(sensors.pop());
        }
    }

    private void removeAllStoreLevelMetrics(final String threadId,
                                            final String taskId,
                                            final String storeName) {
        final String key = storeSensorPrefix(threadId, taskId, storeName);
        final Deque<MetricName> metricNames = storeLevelMetrics.remove(key);
        while (metricNames != null && !metricNames.isEmpty()) {
            metrics.removeMetric(metricNames.pop());
        }
    }

    private String storeSensorPrefix(final String threadId,
                                     final String taskId,
                                     final String storeName) {
        return taskSensorPrefix(threadId, taskId)
            + SENSOR_PREFIX_DELIMITER + SENSOR_STORE_LABEL + SENSOR_PREFIX_DELIMITER + storeName;
    }

    @Override
    public Sensor addSensor(final String name, final Sensor.RecordingLevel recordingLevel) {
        return metrics.sensor(name, recordingLevel);
    }

    @Override
    public Sensor addSensor(final String name, final Sensor.RecordingLevel recordingLevel, final Sensor... parents) {
        return metrics.sensor(name, recordingLevel, parents);
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        return Collections.unmodifiableMap(this.metrics.metrics());
    }

    private Map<String, String> customizedTags(final String threadId,
                                               final String scopeName,
                                               final String entityName,
                                               final String... tags) {
        final Map<String, String> tagMap = threadLevelTagMap(threadId);
        tagMap.put(scopeName + "-id", entityName);
        if (tags != null) {
            if ((tags.length % 2) != 0) {
                throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
            }
            for (int i = 0; i < tags.length; i += 2) {
                tagMap.put(tags[i], tags[i + 1]);
            }
        }
        return tagMap;
    }

    private Sensor customInvocationRateAndCountSensor(final String threadId,
                                                      final String groupName,
                                                      final String entityName,
                                                      final String operationName,
                                                      final Map<String, String> tags,
                                                      final Sensor.RecordingLevel recordingLevel) {
        final Sensor sensor = metrics.sensor(externalChildSensorName(threadId, operationName, entityName), recordingLevel);
        addInvocationRateAndCountToSensor(
            sensor,
            groupName,
            tags,
            operationName,
            RATE_DESCRIPTION_PREFIX + operationName + OPERATIONS + RATE_DESCRIPTION_SUFFIX,
            TOTAL_DESCRIPTION + operationName + OPERATIONS
        );
        return sensor;
    }

    @Override
    public Sensor addLatencyRateTotalSensor(final String scopeName,
                                            final String entityName,
                                            final String operationName,
                                            final Sensor.RecordingLevel recordingLevel,
                                            final String... tags) {
        final String threadId = Thread.currentThread().getName();
        final String group = groupNameFromScope(scopeName);
        final Map<String, String> tagMap = customizedTags(threadId, scopeName, entityName, tags);
        final Sensor sensor =
            customInvocationRateAndCountSensor(threadId, group, entityName, operationName, tagMap, recordingLevel);
        addAvgAndMaxToSensor(
            sensor,
            group,
            tagMap,
            operationName + LATENCY_SUFFIX,
            AVG_LATENCY_DESCRIPTION + operationName,
            MAX_LATENCY_DESCRIPTION + operationName
        );

        return sensor;
    }

    @Override
    public Sensor addRateTotalSensor(final String scopeName,
                                     final String entityName,
                                     final String operationName,
                                     final Sensor.RecordingLevel recordingLevel,
                                     final String... tags) {
        final String threadId = Thread.currentThread().getName();
        final Map<String, String> tagMap = customizedTags(threadId, scopeName, entityName, tags);
        return customInvocationRateAndCountSensor(
            threadId,
            groupNameFromScope(scopeName),
            entityName,
            operationName,
            tagMap,
            recordingLevel
        );
    }

    private String externalChildSensorName(final String threadId, final String operationName, final String entityName) {
        return SENSOR_EXTERNAL_LABEL + SENSOR_PREFIX_DELIMITER + threadId
            + SENSOR_PREFIX_DELIMITER + SENSOR_ENTITY_LABEL + SENSOR_PREFIX_DELIMITER + entityName
            + SENSOR_NAME_DELIMITER + operationName;
    }

    public static void addAvgAndMaxToSensor(final Sensor sensor,
                                            final String group,
                                            final Map<String, String> tags,
                                            final String operation,
                                            final String descriptionOfAvg,
                                            final String descriptionOfMax) {
        sensor.add(
            new MetricName(
                operation + AVG_SUFFIX,
                group,
                descriptionOfAvg,
                tags),
            new Avg()
        );
        sensor.add(
            new MetricName(
                operation + MAX_SUFFIX,
                group,
                descriptionOfMax,
                tags),
            new Max()
        );
    }

    public static void addMinAndMaxToSensor(final Sensor sensor,
                                            final String group,
                                            final Map<String, String> tags,
                                            final String operation,
                                            final String descriptionOfMin,
                                            final String descriptionOfMax) {
        sensor.add(
            new MetricName(
                operation + MIN_SUFFIX,
                group,
                descriptionOfMin,
                tags),
            new Min()
        );

        sensor.add(
            new MetricName(
                operation + MAX_SUFFIX,
                group,
                descriptionOfMax,
                tags),
            new Max()
        );
    }

    public static void addAvgAndMaxLatencyToSensor(final Sensor sensor,
                                                   final String group,
                                                   final Map<String, String> tags,
                                                   final String operation) {
        sensor.add(
            new MetricName(
                operation + "-latency-avg",
                group,
                AVG_LATENCY_DESCRIPTION + operation + " operation.",
                tags),
            new Avg()
        );
        sensor.add(
            new MetricName(
                operation + "-latency-max",
                group,
                MAX_LATENCY_DESCRIPTION + operation + " operation.",
                tags),
            new Max()
        );
    }

    public static void addAvgAndMinAndMaxToSensor(final Sensor sensor,
                                                  final String group,
                                                  final Map<String, String> tags,
                                                  final String operation,
                                                  final String descriptionOfAvg,
                                                  final String descriptionOfMin,
                                                  final String descriptionOfMax) {
        addAvgAndMaxToSensor(sensor, group, tags, operation, descriptionOfAvg, descriptionOfMax);
        sensor.add(
            new MetricName(
                operation + MIN_SUFFIX,
                group,
                descriptionOfMin,
                tags),
            new Min()
        );
    }

    public static void addInvocationRateAndCountToSensor(final Sensor sensor,
                                                         final String group,
                                                         final Map<String, String> tags,
                                                         final String operation,
                                                         final String descriptionOfRate,
                                                         final String descriptionOfCount) {
        addInvocationRateToSensor(sensor, group, tags, operation, descriptionOfRate);
        sensor.add(
            new MetricName(
                operation + TOTAL_SUFFIX,
                group,
                descriptionOfCount,
                tags
            ),
            new CumulativeCount()
        );
    }

    public static void addInvocationRateToSensor(final Sensor sensor,
                                                 final String group,
                                                 final Map<String, String> tags,
                                                 final String operation,
                                                 final String descriptionOfRate) {
        sensor.add(
            new MetricName(
                operation + RATE_SUFFIX,
                group,
                descriptionOfRate,
                tags
            ),
            new Rate(TimeUnit.SECONDS, new WindowedCount())
        );
    }

    public static void addInvocationRateAndCountToSensor(final Sensor sensor,
                                                         final String group,
                                                         final Map<String, String> tags,
                                                         final String operation) {
        addInvocationRateAndCountToSensor(
            sensor,
            group,
            tags,
            operation,
            RATE_DESCRIPTION + operation,
            TOTAL_DESCRIPTION + operation
        );
    }

    public static void addRateOfSumAndSumMetricsToSensor(final Sensor sensor,
                                                         final String group,
                                                         final Map<String, String> tags,
                                                         final String operation,
                                                         final String descriptionOfRate,
                                                         final String descriptionOfTotal) {
        addRateOfSumMetricToSensor(sensor, group, tags, operation, descriptionOfRate);
        addSumMetricToSensor(sensor, group, tags, operation, descriptionOfTotal);
    }

    public static void addRateOfSumMetricToSensor(final Sensor sensor,
                                                  final String group,
                                                  final Map<String, String> tags,
                                                  final String operation,
                                                  final String description) {
        sensor.add(new MetricName(operation + RATE_SUFFIX, group, description, tags),
                   new Rate(TimeUnit.SECONDS, new WindowedSum()));
    }

    public static void addSumMetricToSensor(final Sensor sensor,
                                            final String group,
                                            final Map<String, String> tags,
                                            final String operation,
                                            final String description) {
        addSumMetricToSensor(sensor, group, tags, operation, true, description);
    }

    public static void addSumMetricToSensor(final Sensor sensor,
                                            final String group,
                                            final Map<String, String> tags,
                                            final String operation,
                                            final boolean withSuffix,
                                            final String description) {
        sensor.add(
            new MetricName(
                withSuffix ? operation + TOTAL_SUFFIX : operation,
                group,
                description,
                tags
            ),
            new CumulativeSum()
        );
    }

    public static void addValueMetricToSensor(final Sensor sensor,
                                              final String group,
                                              final Map<String, String> tags,
                                              final String name,
                                              final String description) {
        sensor.add(new MetricName(name, group, description, tags), new Value());
    }

    public static void addAvgAndSumMetricsToSensor(final Sensor sensor,
                                                   final String group,
                                                   final Map<String, String> tags,
                                                   final String metricNamePrefix,
                                                   final String descriptionOfAvg,
                                                   final String descriptionOfTotal) {
        sensor.add(new MetricName(metricNamePrefix + AVG_SUFFIX, group, descriptionOfAvg, tags), new Avg());
        sensor.add(
            new MetricName(metricNamePrefix + TOTAL_SUFFIX, group, descriptionOfTotal, tags),
            new CumulativeSum()
        );
    }

    public static void addTotalCountAndSumMetricsToSensor(final Sensor sensor,
                                                          final String group,
                                                          final Map<String, String> tags,
                                                          final String countMetricNamePrefix,
                                                          final String sumMetricNamePrefix,
                                                          final String descriptionOfCount,
                                                          final String descriptionOfTotal) {
        sensor.add(
            new MetricName(countMetricNamePrefix + TOTAL_SUFFIX, group, descriptionOfCount, tags),
            new CumulativeCount()
        );
        sensor.add(
            new MetricName(sumMetricNamePrefix + TOTAL_SUFFIX, group, descriptionOfTotal, tags),
            new CumulativeSum()
        );
    }

    public static void maybeMeasureLatency(final Runnable actionToMeasure,
                                           final Time time,
                                           final Sensor sensor) {
        if (sensor.shouldRecord() && sensor.hasMetrics()) {
            final long startNs = time.nanoseconds();
            try {
                actionToMeasure.run();
            } finally {
                sensor.record(time.nanoseconds() - startNs);
            }
        } else {
            actionToMeasure.run();
        }
    }

    public static <T> T maybeMeasureLatency(final Supplier<T> actionToMeasure,
                                            final Time time,
                                            final Sensor sensor) {
        if (sensor.shouldRecord() && sensor.hasMetrics()) {
            final long startNs = time.nanoseconds();
            try {
                return actionToMeasure.get();
            } finally {
                sensor.record(time.nanoseconds() - startNs);
            }
        } else {
            return actionToMeasure.get();
        }
    }

    private Sensor getSensors(final Map<String, Deque<String>> sensors,
                              final String sensorName,
                              final String key,
                              final RecordingLevel recordingLevel,
                              final Sensor... parents) {
        final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;
        final Sensor sensor = metrics.getSensor(fullSensorName);
        if (sensor == null) {
            sensors.computeIfAbsent(key, ignored -> new LinkedList<>()).push(fullSensorName);
            return metrics.sensor(fullSensorName, recordingLevel, parents);
        }
        return sensor;
    }

    /**
     * Deletes a sensor and its parents, if any
     */
    @Override
    public void removeSensor(final Sensor sensor) {
        Objects.requireNonNull(sensor, "Sensor is null");
        metrics.removeSensor(sensor.name());

        final Sensor parent = parentSensors.remove(sensor);
        if (parent != null) {
            metrics.removeSensor(parent.name());
        }
    }

    /**
     * Visible for testing
     */
    Map<Sensor, Sensor> parentSensors() {
        return Collections.unmodifiableMap(parentSensors);
    }

    private static String groupNameFromScope(final String scopeName) {
        return "stream-" + scopeName + "-metrics";
    }
}

相关信息

kafka 源码目录

相关文章

kafka ProcessorNodeMetrics 源码

kafka TaskMetrics 源码

kafka ThreadMetrics 源码

kafka TopicMetrics 源码

0  赞