Kafka 的语义约定
状态: 开发中
Apache Kafka 的语义约定扩展并覆盖了Apache Kafka的消息传递语义约定。
正在使用 此文档 v1.24.0(或之前版本) 的现有消息传递仪器
- 在消息传递语义约定被标记为稳定之前,不应更改它们默认发出的消息传递约定版本。约定包括但不限于属性、指标和跨度名称、跨度种类和度量单位。
- 在现有主版本中,应引入一个名为
OTEL_SEMCONV_STABILITY_OPT_IN的环境变量,该变量是一个逗号分隔的类别特定值列表(例如,http、databases、messaging)。值列表包括:messaging- 发出新的、稳定的消息传递约定,并停止发出仪器先前发出的旧实验性消息传递约定。messaging/dup- 同时发出旧的稳定消息传递约定和新的稳定消息传递约定,实现无缝过渡。- 默认行为(在没有这些值的情况下)是继续发出仪器先前发出的旧实验性消息传递约定的任何版本。
- 注意:如果同时存在
messaging和messaging/dup,则messaging/dup的优先级高于messaging。
- 在开始发出两组约定后,应至少维护现有主版本(至少进行安全修补)六个月。
- 应在下一个主版本中删除该环境变量。
- 当
messaging/dup出现在列表中时,应为跨度名称、跨度种类和类似的“单一”值概念发出新的、稳定的值。
messaging.system 必须设置为 "kafka",并且应在span 创建时提供。
Span 属性
对于 Apache Kafka,定义了以下附加属性
Attributes
| 键 | Stability | 需求级别 | Value Type | 描述 | Example Values |
|---|---|---|---|---|---|
messaging.operation.name | 必需 | 字符串 | 消息操作的系统特定名称。 | ack; nack; send | |
error.type | 条件必需 仅当消息操作失败时。 | 字符串 | 描述操作最终结束的一类错误。[1] | amqp:decode-error; KAFKA_STORAGE_ERROR; channel-error | |
messaging.batch.message_count | 有条件地必需 [2] | int | 在批处理操作的范围内发送、接收或处理的消息数量。[3] | 0; 1; 2 | |
messaging.destination.name | 有条件地必需 [4] | 字符串 | 消息目标名称 [5] | MyQueue; MyTopic | |
messaging.kafka.message.tombstone | 有条件地必需 [6] | 布尔值 | 如果消息是墓碑消息,则为 true。 | ||
messaging.operation.type | 条件必需 如果适用。 | 字符串 | 标识消息操作类型的字符串。[7] | create; send; receive | |
server.address | 有条件必需 如果可用。 | 字符串 | 如果可用且无需反向 DNS 查找,则为服务器域名;否则,为 IP 地址或 Unix 域套接字名称。[8] | example.com;10.1.2.80;/tmp/my.sock | |
messaging.client.id | 推荐 | 字符串 | 标识消耗或生成消息的客户端的唯一标识符。 | client-5; myhost@8742@s8083jm | |
messaging.consumer.group.name | 推荐 | 字符串 | Kafka 消费者组 ID。 | my-group; indexer | |
messaging.destination.partition.id | 推荐 | 字符串 | 消息(或批次)发送到或接收自的分区 ID 的字符串表示。 | 1 | |
messaging.kafka.message.key | 推荐 如果 span 描述的是单条消息的操作。 | 字符串 | Kafka 中的消息键用于对相似消息进行分组,以确保它们在同一个分区上进行处理。它们与 messaging.message.id 不同,因为它们不是唯一的。如果键为 null,则不应设置此属性。[9] | myKey | |
messaging.kafka.offset | 推荐 如果 span 描述的是单条消息的操作。 | int | 记录在相应 Kafka 分区中的偏移量。 | 42 | |
messaging.message.id | 推荐 如果 span 描述的是单条消息的操作。 | 字符串 | 消息传递系统用作消息标识符的值,表示为字符串。 | 452a7c7c7c7048c2f887f61572b18fc2 | |
server.port | 推荐 | int | 服务器端口号。[10] | 80; 8080; 443 | |
messaging.message.body.size | 选择加入 | int | 消息正文的字节大小。仅适用于描述单条消息操作的 span。[11] | 1439 |
[1] error.type: error.type 应该可预测,并且应该具有低基数性。
当 error.type 设置为某个类型(例如,异常类型)时,应该使用该工件内识别类型的规范类名。
Instrumentations 应该记录它们报告的错误列表。
一个仪器库内的 error.type 基数性应该低。从多个仪器库和应用程序聚合数据的遥测消费者,在没有额外过滤时,应准备好 error.type 在查询时具有高基数性。
如果操作已成功完成,Instrumentations 不应设置 error.type。
如果特定域定义了自己的一组错误标识符(例如 HTTP 或 gRPC 状态码),则建议
- 使用特定于域的属性
- 设置
error.type以捕获所有错误,无论它们是否包含在特定于域的集合中。
[2] messaging.batch.message_count: 如果 span 描述的是对消息批次的操作。
[3] messaging.batch.message_count: 仪表化不应在操作单条消息的 span 上设置 messaging.batch.message_count。当消息客户端库为同一操作支持批处理和单消息 API 时,仪表化应为批处理 API 使用 messaging.batch.message_count,而不应为单消息 API 使用它。
[4] messaging.destination.name: 如果 span 描述的是单条消息的操作,或者该值适用于批次中的所有消息。
[5] messaging.destination.name: 目标名称应唯一标识代理中的特定队列、主题或其他实体。如果代理没有此类概念,则目标名称应唯一标识代理。
[6] messaging.kafka.message.tombstone: 如果值为 true。如果缺失,则假定值为 false。
[7] messaging.operation.type:如果使用了自定义值,则它必须是低基数的。
[8] server.address: 如果可用且无需反向 DNS 查找,则为代理的服务器域名;否则,为 IP 地址或 Unix 域套接字名称。
[9] messaging.kafka.message.key: 如果键类型不是字符串,则必须提供其字符串表示形式。如果键没有明确的、规范的字符串形式,则不包含其值。
[10] server.port: 当从客户端观察到并且通过中间件通信时,server.port 应代表任何中间件(例如代理)之后的服务器端口,如果可用。
[11] messaging.message.body.size: 这可以指压缩或未压缩的正文大小。如果两种大小都已知,则应使用未压缩的正文大小。
以下属性对于做出采样决策可能很重要,并且应在跨度创建时提供(如果提供的话)
messaging.consumer.group.namemessaging.destination.namemessaging.destination.partition.idmessaging.operation.namemessaging.operation.typeserver.addressserver.port
error.type 具有以下已知值列表。如果其中一个适用,则必须使用相应的值;否则,可以使用自定义值。
| 值 | 描述 | Stability |
|---|---|---|
_OTHER | 当检测不到自定义值时使用的回退错误值。 |
messaging.operation.type 具有以下一系列已知值。如果适用其中一个,则必须使用相应的值;否则,可以使用自定义值。
| 值 | 描述 | Stability |
|---|---|---|
create | 创建消息。“Create”跨度始终指单条消息,用于为批量发送场景中的消息提供唯一的创建上下文。 | |
process | 一个或多个消息由消费者处理。 | |
receive | 一个或多个消息由消费者请求。此操作指拉取式场景,其中消费者显式调用消息 SDK 的方法来接收消息。 | |
send | 提供一个或多个消息以发送到中介。如果发送了单条消息,“Send”跨度的上下文可用作创建上下文,无需创建“Create”跨度。 | |
settle | 一个或多个消息已结算。 |
对于 Apache Kafka 生产者,peer.service 应设置为消息将发送到的代理或服务的名称。当消息直接传递给另一个服务时,消费者的 service.name 应匹配生产者的 peer.service。如果存在中间代理,service.name 和 peer.service 将不相同。
messaging.client.id 应设置为消费者或生产者的客户端名称,该名称对每个独立实例都是唯一的。
示例
Apache Kafka 与 Quarkus 或 Spring Boot 示例
在此示例中,生产者将一条消息发布到 Apache Kafka 的主题 T。消费者接收消息,处理它并提交偏移量。
Quarkus 和 Spring Boot 等框架提供了与 Kafka 的集成,允许配置和检测处理回调,因此相应的仪表化应创建“Process”span,以及 Kafka 仪表化为轮询调用创建的“Receive”span。
flowchart LR; subgraph PRODUCER P[Span Send] end subgraph CONSUMER direction TB R1[Span Poll] R2[Span Process] R3[Span Commit] end P-. link .-R1; P-. link .-R2; R2-- parent ---R3; classDef normal fill:green class P,R1,R2,R3 normal linkStyle 0 color:green,stroke:green linkStyle 1 color:green,stroke:green
| 字段或属性 | Producer | 消费者 Span 轮询 | 消费者 Span 处理 | 消费者 Span Commit T |
|---|---|---|---|---|
| Span 名称 | "send T" | "poll T" | "process T" | "commit T" |
| Parent | (可选) Span 发送 | Span 处理 | ||
| 链接 | Span 发送 | Span 发送 | ||
| SpanKind | PRODUCER | CLIENT | CONSUMER | CLIENT |
| 状态 | 未设置 | 未设置 | 未设置 | 未设置 |
messaging.system | "kafka" | "kafka" | "kafka" | "kafka" |
messaging.destination.name | "T" | "T" | "T" | "T" |
messaging.consumer.group.name | "my-group" | "my-group" | "my-group" | |
messaging.destination.partition.id | "1" | "1" | "1" | "1" |
messaging.operation.name | "send" | "poll" | "process" | "commit" |
messaging.operation.type | "send" | "receive" | "process" | "settle" |
messaging.client.id | "5" | "8" | "8" | "8" |
messaging.kafka.message.key | "myKey" | "myKey" | "myKey" | |
messaging.kafka.offset | "12" | "12" | "12" |