为 Apache Kafka 客户端进行 OpenTelemetry 仪表化
博客文章在发布后不会更新。这篇文章已经发布一年多了,其内容可能已过时,部分链接可能无效。在依赖任何信息之前,请务必核实。
如今,Apache Kafka 已成为分布式环境中的神经中枢。不同的服务通过 Apache Kafka 作为消息系统进行通信,更甚者将其作为事件或数据流平台。
考虑到云原生微服务开发方法,Kubernetes 也常用于运行工作负载。在这种场景下,您也可以轻松地在其之上部署和管理 Apache Kafka 集群,方法是使用像 Strimzi 这样的项目。它负责整个 Kafka 基础设施,而您可以专注于开发使用它的应用程序。
在整体图景中,由于其分布式特性,追踪消息的流转非常困难。这时 OpenTelemetry 就派上用场了。它提供了多种插桩库,用于向基于消息的应用程序添加追踪。当然,其中也有一个用于 Apache Kafka 客户端的。它还定义了 消息系统 的语义约定规范。
但通常,架构可能更加复杂:应用程序无法直接连接到 Apache Kafka 集群,而是使用自己的自定义协议,例如 HTTP。在这种情况下,追踪通过 HTTP 传递的 Kafka 消息的生产和消费会非常复杂。Strimzi 项目为此提供了一个支持 OpenTelemetry 的桥梁,用于通过相应的插桩库添加追踪数据。
在本文中,您将学习如何通过不同方式为基于 Apache Kafka 的客户端应用程序启用追踪。我们将以 Java 插桩为例。您也可以在此 仓库 中找到所有示例。
为 Kafka 客户端启用追踪
假设您有一个使用 Kafka 客户端 API 来生产和消费消息的应用程序。为了简化场景,我们还假设您不想在业务逻辑中添加任何额外的追踪信息。您只对添加与 Kafka 相关的部分的追踪感兴趣。您想追踪消息通过 Kafka 客户端的生产和消费过程。
要实现这一点,有两种不同的方法:
- 使用一个外部代理与您的应用程序并行运行以添加追踪。
- 直接在您的应用程序使用的 Kafka 客户端上启用追踪。
前者实际上是一种“自动”方法,即根本不修改您的应用程序。代理与应用程序并行运行,能够拦截进出的消息并为其添加追踪信息。
后者主要是一种“手动”方法,即直接插桩您的应用程序。这意味着为您的项目添加一些特定的依赖项并进行代码更改。
通过代理进行插桩
最简单、最自动的方法是为您的应用程序添加追踪,而无需更改或添加任何应用程序代码。您也无需为 OpenTelemetry 特定库添加任何依赖项。这可以通过使用 OpenTelemetry 代理来实现,您可以从 opentelemetry-java-instrumentation/releases 下载。此代理必须与您的应用程序并行运行,以便注入用于追踪发送到 Kafka 集群的消息以及从 Kafka 集群接收的消息的逻辑。
以以下方式运行生产者应用程序。
java -javaagent:path/to/opentelemetry-javaagent.jar \
-Dotel.service.name=my-kafka-service \
-Dotel.traces.exporter=jaeger \
-Dotel.metrics.exporter=none \
-jar kafka-producer-agent/target/kafka-producer-agent-1.0-SNAPSHOT-jar-with-dependencies.jar
同样的方式运行消费者应用程序。
java -javaagent:path/to/opentelemetry-javaagent.jar \
-Dotel.service.name=my-kafka-service \
-Dotel.traces.exporter=jaeger \
-Dotel.metrics.exporter=none \
-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true \
-jar kafka-consumer-agent/target/kafka-consumer-agent-1.0-SNAPSHOT-jar-with-dependencies.jar
该代理利用了我们稍后会看到的自动配置 SDK 扩展,通过设置系统属性来设置主要参数。
插桩 Apache Kafka 客户端
OpenTelemetry 项目提供了 opentelemetry-kafka-clients-2.6 模块,该模块为 Kafka 客户端提供追踪插桩。首先,您需要为您的应用程序添加相应的依赖项。
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-kafka-clients-2.6</artifactId>
</dependency>
根据您要用于导出追踪信息的导出器,您也需要添加相应的依赖项。例如,为了使用 Jaeger 导出器,依赖项如下。
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-jaeger</artifactId>
</dependency>
这样,您就具备了在基于 Kafka 的应用程序中启用追踪的最小设置。
设置 OpenTelemetry 实例
整个追踪插桩由您代码中的 OpenTelemetry 实例处理。需要创建并全局注册它,以便 Kafka 客户端插桩库可以使用。
这可以通过两种不同的方式完成:
- 使用用于基于环境的自动配置的 SDK 扩展。
- 使用 SDK 构建器类进行编程配置。
使用 SDK 自动配置
可以通过环境变量配置全局 OpenTelemetry 实例,这得益于用于自动配置的 SDK 扩展,通过将以下依赖项添加到您的应用程序中即可启用。
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
</dependency>
当使用 Kafka 客户端插桩库时,它会检查是否已有 OpenTelemetry 实例被创建和注册。如果没有,库代码会检查 SDK 自动配置模块是否在类路径中,在这种情况下,它会初始化该模块以自动创建 OpenTelemetry 实例。相应的配置通过环境变量(或对应的系统属性)进行。这确实是一种简化追踪初始化的方法。
需要设置的主要环境变量如下:
OTEL_SERVICE_NAME:指定逻辑服务名。在使用追踪 UI(例如 Jaeger UI)显示数据时非常有用,建议设置。OTEL_TRACES_EXPORTER:用于追踪的导出器列表。例如,使用jaeger时,您还需要在应用程序中拥有相应的依赖项。
除了使用上述环境变量,还可以使用相应的系统属性,这些属性可以在代码中以编程方式设置,或在启动应用程序时在命令行上设置。它们是 otel.service.name 和 otel.traces.exporter。
使用 SDK 构建器
为了构建您自己的 OpenTelemetry 实例而不依赖自动配置,可以通过以编程方式使用 SDK 构建器类来实现。需要 OpenTelemetry SDK 依赖项才能在您的代码中获得这些 SDK 构建器。
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
</dependency>
以下代码片段设置了主要属性,如服务名,然后配置了 Jaeger 导出器。最后,它创建了 OpenTelemetry 实例并将其全局注册,以便 Kafka 客户端插桩库可以使用。
Resource resource = Resource.getDefault()
.merge(Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, "my-kafka-service")));
SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
.addSpanProcessor(BatchSpanProcessor.builder(JaegerGrpcSpanExporter.builder().build()).build())
.setSampler(Sampler.alwaysOn())
.setResource(resource)
.build();
OpenTelemetry openTelemetry = OpenTelemetrySdk.builder()
.setTracerProvider(sdkTracerProvider)
.setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
.buildAndRegisterGlobal();
使用拦截器
Kafka 客户端 API 提供了一种方法,可以在消息发送到代理之前以及消息从代理接收并传递给应用程序之前进行“拦截”。当您需要在消息发送前添加某些逻辑或内容时,这种方法被大量使用。同时,在消费的消息在传递给上层应用程序之前进行处理也很有用。这非常适合追踪,当您需要在发送和接收消息时创建或关闭 spans。
Kafka 客户端插桩库提供了两个拦截器来配置以自动添加追踪信息。拦截器类必须设置在应用程序中用于创建 Kafka 客户端的属性袋中。
使用 TracingProducerInterceptor 进行生产者,以便每次发送消息时自动创建一个“发送” span。
props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
使用 TracingConsumerInterceptor 进行消费者,以便每次接收消息时自动创建一个“接收” span。
props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
包装客户端
另一种方法是使用支持追踪的客户端包装 Kafka 客户端。
在生产者端,假设您有一个 Producer<K, V> 实例,您可以如下方式包装它。
KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
Producer<String, String> tracingProducer = telemetry.wrap(producer);
然后像往常一样使用 tracingProducer 将消息发送到 Kafka 集群。
在消费者端,假设您有一个 Consumer<K, V> 实例,您可以如下方式包装它。
KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
Consumer<String, String> tracingConsumer = telemetry.wrap(this.consumer);
然后像往常一样使用 tracingConsumer 将消息从 Kafka 集群接收。
插桩实践
为了通过提供的示例练习 Kafka 客户端的插桩,首先您需要一个 Apache Kafka 集群。最简单的方法是从 官方网站 下载它,并运行一个 ZooKeeper 节点和一个 Kafka Broker。您可以按照 快速入门 在几分钟内启动并运行这样一个集群。使用 Web UI(例如 Jaeger 提供的)来分析追踪信息也更简单。即使在这种情况下,从 官方网站 下载并本地运行它也非常简单。
当环境准备好后,第一次尝试是运行使用拦截器或包装器插桩的生产者和消费者应用程序。只需发送一条消息并消费它,即可得到如下追踪。

正如您所见,“发送”和“接收” spans 都位于同一个 trace 中,并且“接收” span 定义为“发送” span 的 CHILD_OF。您还可以看到语义定义了一些带有 messaging. 前缀的特定消息相关标签。这个语义实际上是不正确的,因为发送操作不依赖于接收(这是 CHILD_OF 关系的意思)。根据这个 GitHub 讨论 和即将通过这个新的 OTEP(OpenTelemetry 增强提案) 稳定的新消息语义约定,它将进行更改。目标是让“发送”和“接收” spans 位于两个不同的 trace 中,但通过 FOLLOW_FROM 关系链接在一起。
当使用代理时,这种方法更能体现出来,因为“发送” span 位于其自己的 trace 中,如下所示。

在接收端,还有“接收”和“处理” spans 指向生产者。

结论
Apache Kafka 只是用于分布式系统中微服务通信的消息平台之一。监控消息交换方式和排除故障非常复杂。这时 OpenTelemetry 项目就派上用场了,它将追踪掌握在您手中。在本文中,我们看到了 Kafka 客户端插桩库如何使为您的 Kafka 应用程序添加追踪信息变得非常简单。您可以获取更多关于生产者和消费者行为以及端到端追踪每条消息的信息。那么……还有什么呢?试试看吧!