Kafka 的语义约定

状态: 开发中

Apache Kafka 的语义约定扩展并覆盖了Apache Kafka消息传递语义约定

警告

正在使用 此文档 v1.24.0(或之前版本) 的现有消息传递仪器

  • 在消息传递语义约定被标记为稳定之前,不应更改它们默认发出的消息传递约定版本。约定包括但不限于属性、指标和跨度名称、跨度种类和度量单位。
  • 在现有主版本中,应引入一个名为 OTEL_SEMCONV_STABILITY_OPT_IN 的环境变量,该变量是一个逗号分隔的类别特定值列表(例如,http、databases、messaging)。值列表包括:
    • messaging - 发出新的、稳定的消息传递约定,并停止发出仪器先前发出的旧实验性消息传递约定。
    • messaging/dup - 同时发出旧的稳定消息传递约定和新的稳定消息传递约定,实现无缝过渡。
    • 默认行为(在没有这些值的情况下)是继续发出仪器先前发出的旧实验性消息传递约定的任何版本。
    • 注意:如果同时存在 messagingmessaging/dup,则 messaging/dup 的优先级高于 messaging
  • 在开始发出两组约定后,应至少维护现有主版本(至少进行安全修补)六个月。
  • 应在下一个主版本中删除该环境变量。
  • messaging/dup 出现在列表中时,应为跨度名称、跨度种类和类似的“单一”值概念发出新的、稳定的值。

messaging.system 必须设置为 "kafka",并且应在span 创建时提供。

Span 属性

对于 Apache Kafka,定义了以下附加属性

Attributes

Stability需求级别Value Type描述Example Values
messaging.operation.nameDevelopment必需字符串消息操作的系统特定名称。ack; nack; send
error.typeStable条件必需 仅当消息操作失败时。字符串描述操作最终结束的一类错误。[1]amqp:decode-error; KAFKA_STORAGE_ERROR; channel-error
messaging.batch.message_countDevelopment有条件地必需 [2]int在批处理操作的范围内发送、接收或处理的消息数量。[3]0; 1; 2
messaging.destination.nameDevelopment有条件地必需 [4]字符串消息目标名称 [5]MyQueue; MyTopic
messaging.kafka.message.tombstoneDevelopment有条件地必需 [6]布尔值如果消息是墓碑消息,则为 true。
messaging.operation.typeDevelopment条件必需 如果适用。字符串标识消息操作类型的字符串。[7]create; send; receive
server.addressStable有条件必需 如果可用。字符串如果可用且无需反向 DNS 查找,则为服务器域名;否则,为 IP 地址或 Unix 域套接字名称。[8]example.com10.1.2.80/tmp/my.sock
messaging.client.idDevelopment推荐字符串标识消耗或生成消息的客户端的唯一标识符。client-5; myhost@8742@s8083jm
messaging.consumer.group.nameDevelopment推荐字符串Kafka 消费者组 IDmy-group; indexer
messaging.destination.partition.idDevelopment推荐字符串消息(或批次)发送到或接收自的分区 ID 的字符串表示。1
messaging.kafka.message.keyDevelopment推荐 如果 span 描述的是单条消息的操作。字符串Kafka 中的消息键用于对相似消息进行分组,以确保它们在同一个分区上进行处理。它们与 messaging.message.id 不同,因为它们不是唯一的。如果键为 null,则不应设置此属性。[9]myKey
messaging.kafka.offsetDevelopment推荐 如果 span 描述的是单条消息的操作。int记录在相应 Kafka 分区中的偏移量。42
messaging.message.idDevelopment推荐 如果 span 描述的是单条消息的操作。字符串消息传递系统用作消息标识符的值,表示为字符串。452a7c7c7c7048c2f887f61572b18fc2
server.portStable推荐int服务器端口号。[10]80; 8080; 443
messaging.message.body.sizeDevelopment选择加入int消息正文的字节大小。仅适用于描述单条消息操作的 span。[11]1439

[1] error.type error.type 应该可预测,并且应该具有低基数性。

error.type 设置为某个类型(例如,异常类型)时,应该使用该工件内识别类型的规范类名。

Instrumentations 应该记录它们报告的错误列表。

一个仪器库内的 error.type 基数性应该低。从多个仪器库和应用程序聚合数据的遥测消费者,在没有额外过滤时,应准备好 error.type 在查询时具有高基数性。

如果操作已成功完成,Instrumentations 不应设置 error.type

如果特定域定义了自己的一组错误标识符(例如 HTTP 或 gRPC 状态码),则建议

  • 使用特定于域的属性
  • 设置 error.type 以捕获所有错误,无论它们是否包含在特定于域的集合中。

[2] messaging.batch.message_count: 如果 span 描述的是对消息批次的操作。

[3] messaging.batch.message_count: 仪表化不应在操作单条消息的 span 上设置 messaging.batch.message_count。当消息客户端库为同一操作支持批处理和单消息 API 时,仪表化应为批处理 API 使用 messaging.batch.message_count,而不应为单消息 API 使用它。

[4] messaging.destination.name: 如果 span 描述的是单条消息的操作,或者该值适用于批次中的所有消息。

[5] messaging.destination.name: 目标名称应唯一标识代理中的特定队列、主题或其他实体。如果代理没有此类概念,则目标名称应唯一标识代理。

[6] messaging.kafka.message.tombstone: 如果值为 true。如果缺失,则假定值为 false

[7] messaging.operation.type如果使用了自定义值,则它必须是低基数的。

[8] server.address: 如果可用且无需反向 DNS 查找,则为代理的服务器域名;否则,为 IP 地址或 Unix 域套接字名称。

[9] messaging.kafka.message.key: 如果键类型不是字符串,则必须提供其字符串表示形式。如果键没有明确的、规范的字符串形式,则不包含其值。

[10] server.port: 当从客户端观察到并且通过中间件通信时,server.port 应代表任何中间件(例如代理)之后的服务器端口,如果可用。

[11] messaging.message.body.size: 这可以指压缩或未压缩的正文大小。如果两种大小都已知,则应使用未压缩的正文大小。

以下属性对于做出采样决策可能很重要,并且应在跨度创建时提供(如果提供的话)


error.type 具有以下已知值列表。如果其中一个适用,则必须使用相应的值;否则,可以使用自定义值。

描述Stability
_OTHER当检测不到自定义值时使用的回退错误值。Stable

messaging.operation.type 具有以下一系列已知值。如果适用其中一个,则必须使用相应的值;否则,可以使用自定义值。

描述Stability
create创建消息。“Create”跨度始终指单条消息,用于为批量发送场景中的消息提供唯一的创建上下文。Development
process一个或多个消息由消费者处理。Development
receive一个或多个消息由消费者请求。此操作指拉取式场景,其中消费者显式调用消息 SDK 的方法来接收消息。Development
send提供一个或多个消息以发送到中介。如果发送了单条消息,“Send”跨度的上下文可用作创建上下文,无需创建“Create”跨度。Development
settle一个或多个消息已结算。Development

对于 Apache Kafka 生产者,peer.service 应设置为消息将发送到的代理或服务的名称。当消息直接传递给另一个服务时,消费者的 service.name 应匹配生产者的 peer.service。如果存在中间代理,service.namepeer.service 将不相同。

messaging.client.id 应设置为消费者或生产者的客户端名称,该名称对每个独立实例都是唯一的。

示例

Apache Kafka 与 Quarkus 或 Spring Boot 示例

在此示例中,生产者将一条消息发布到 Apache Kafka 的主题 T。消费者接收消息,处理它并提交偏移量。

Quarkus 和 Spring Boot 等框架提供了与 Kafka 的集成,允许配置和检测处理回调,因此相应的仪表化应创建“Process”span,以及 Kafka 仪表化为轮询调用创建的“Receive”span。

flowchart LR;
  subgraph PRODUCER
  P[Span Send]
  end
  subgraph CONSUMER
  direction TB
  R1[Span Poll]
  R2[Span Process]
  R3[Span Commit]
  end

  P-. link .-R1;
  P-. link .-R2;
  R2-- parent ---R3;

  classDef normal fill:green
  class P,R1,R2,R3 normal
  linkStyle 0 color:green,stroke:green
  linkStyle 1 color:green,stroke:green
字段或属性Producer消费者 Span 轮询消费者 Span 处理消费者 Span Commit T
Span 名称"send T""poll T""process T""commit T"
Parent(可选) Span 发送Span 处理
链接Span 发送Span 发送
SpanKindPRODUCERCLIENTCONSUMERCLIENT
状态未设置未设置未设置未设置
messaging.system"kafka""kafka""kafka""kafka"
messaging.destination.name"T""T""T""T"
messaging.consumer.group.name"my-group""my-group""my-group"
messaging.destination.partition.id"1""1""1""1"
messaging.operation.name"send""poll""process""commit"
messaging.operation.type"send""receive""process""settle"
messaging.client.id"5""8""8""8"
messaging.kafka.message.key"myKey""myKey""myKey"
messaging.kafka.offset"12""12""12"