构建连接器
OpenTelemetry 中的连接器
本页面的内容最适用于您已经拥有一个生成某种追踪遥测数据的已 instrumented 应用程序,并且已经了解 OpenTelemetry Collector。
什么是连接器?
连接器充当将遥测数据在不同 Collector 流水线之间传递的手段,从而将它们连接起来。连接器对一个流水线充当导出器,对另一个流水线充当接收器。OpenTelemetry Collector 中的每个流水线都处理一种遥测数据。可能存在需要将一种形式的遥测数据处理成另一种形式的需求,但需要将相应数据路由到其正确的 Collector 流水线。
为什么要使用连接器?
连接器在合并、路由和复制数据流方面非常有用。除了顺序流水线(即将流水线连接在一起)之外,连接器组件还能够进行条件数据流和生成数据流。条件数据流意味着将数据发送到最高优先级的流水线,并具有错误检测功能,以便在需要时路由到备用流水线。生成数据流意味着该组件根据接收到的数据生成并发出自己的数据。本教程强调连接器连接流水线的能力。
OpenTelemetry 中存在一些处理器,可以将一种类型的遥测数据转换为另一种类型。例如 spanmetrics 处理器和 servicegraph 处理器。spanmetrics 处理器从 span 数据生成聚合请求、错误和持续时间指标。servicegraph 处理器分析追踪数据并生成描述服务之间关系的指标。这两个处理器都接收追踪数据并将其转换为指标数据。由于 OpenTelemetry Collector 中的流水线仅用于一种数据类型,因此需要将处理器在追踪流水线中产生的追踪数据转换为指标数据,并发送到指标流水线。历史上,一些处理器通过利用一种坏实践的变通方法来传输数据,即处理器在处理后直接导出数据。连接器组件解决了这种变通方法的需要,并且历史上使用这种变通方法的处理器已被弃用。同样,上面提到的处理器在近期版本中也已被弃用,并被连接器取代。
有关连接器完整功能的更多详细信息,请参阅以下链接:OpenTelemetry 中的连接器是什么?,OpenTelemetry 连接器配置
旧的架构:
使用连接器的新架构:
构建示例连接器
在本教程中,我们将编写一个示例连接器,它将追踪转换为指标,作为 OpenTelemetry Collector 中连接器组件功能的基本示例。基本连接器的功能是简单地计算包含特定属性名称的追踪中的 span 数量。这些出现次数的计数存储在连接器中。
配置
设置 Collector 配置:
在 config.yaml 文件中设置 OpenTelemetry Collector 的配置。此文件定义了数据将如何被路由、处理和导出。文件中定义的配置详细说明了您希望数据流水线如何运行。您可以定义组件以及数据如何在定义的流水线中从头到尾移动。有关如何配置 Collector 的更多详细信息,请参阅 Collector 配置。
使用下面代码作为我们将要构建的示例连接器。该代码是基本有效的 OpenTelemetry Collector 配置文件的示例。
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
exporters:
debug:
connectors:
example:
service:
pipelines:
traces:
receivers: [otlp]
exporters: [example]
metrics:
receivers: [example]
exporters: [debug]
在上述代码的 connectors 部分,您需要声明流水线中可用的连接器名称。这里,example 是我们将在本教程中创建的连接器的名称。
实现
为您的示例连接器创建一个文件夹。在本教程中,我们将创建一个名为
exampleconnector的文件夹。导航到该文件夹并运行
go mod init github.com/gord02/exampleconnector运行
go mod tidy这将创建
go.mod和go.sum文件。在该文件夹中创建以下文件
config.go- 用于定义连接器设置的文件factory.go- 用于创建连接器实例的文件
在 config.go 中创建连接器设置
为了能够实例化您的连接器并参与流水线,Collector 需要识别您的连接器并从其配置文件中正确加载其设置。
为了能够让您的连接器访问其设置,请创建一个 Config 结构体。该结构体必须为连接器的每个设置都包含一个导出的字段。添加的参数字段将可以从 config.yaml 文件访问。它们在配置文件中的名称通过结构体标签设置。创建结构体并添加参数。您可以选择添加一个验证函数来检查给定的默认值对于您的连接器实例是否有效。
config.go 文件应如下所示
exampleconnector/config.go
package exampleconnector
import "fmt"
// Config represents the connector config settings within the collector's config.yaml
type Config struct {
AttributeName string `mapstructure:"attribute_name"`
}
func (c *Config) Validate() error {
if c.AttributeName == "" {
return fmt.Errorf("attribute_name must not be empty")
}
return nil
}
有关 mapstructure 的更多详细信息,请参阅 Go mapstructure。
实现 Factory
要实例化对象,您需要使用与每个组件关联的 NewFactory 函数。我们将使用 connector.NewFactory 函数。connector.NewFactory 函数实例化并返回一个 connector.Factory,它需要以下参数
component.Type:您的连接器在所有同类型 Collector 组件中唯一的字符串标识符。此字符串也用作引用连接器的名称。component.CreateDefaultConfigFunc:一个指向返回您的连接器的默认component.Config实例的函数的引用。...FactoryOption:connector.FactoryOptions的切片将决定您的连接器能够处理哪种类型的信号。
创建 factory.go 文件,并将用于标识连接器的唯一字符串定义为全局常量。
const defaultVal string = "request.n" // Type is the component type name for this connector var Type = component.MustNewType("example")创建默认配置函数。这是您选择使用默认值初始化连接器对象的方式。
func createDefaultConfig() component.Config { return &Config{ AttributeName: defaultVal, } }定义您将要工作的连接器类型。这将作为工厂选项传递。连接器可以连接不同或相似类型的流水线。我们必须定义连接器导出端和接收端。一个导出追踪并接收指标的连接器只是连接器组件的一种配置,其定义顺序很重要。一个导出追踪并接收指标的连接器与一个可以导出指标并接收追踪的连接器不同。
// createTracesToMetricsConnector defines the consumer type of the connector // We want to consume traces and export metrics, therefore, define nextConsumer as metrics, since consumer is the next component in the pipeline func createTracesToMetricsConnector(ctx context.Context, params connector.Settings, cfg component.Config, nextConsumer consumer.Metrics) (connector.Traces, error) { return newConnector(params.Logger, cfg, nextConsumer) }createTracesToMetricsConnector是一个进一步初始化连接器组件的函数,通过定义其消费者组件,即连接器传输数据后要摄取数据的下一个组件。需要注意的是,连接器不限于一种有序的类型组合,就像我们这里一样。例如,count 连接器为 traces to metrics、logs to metrics 和 metrics to metrics 定义了几个这样的函数。createTracesToMetricsConnector的参数context.Context:Collector 的context.Context的引用,以便您的追踪接收器能够正确管理其执行上下文。connector.CreateSettings:Collector 设置的引用,Collector 在此设置下创建您的接收器。component.Config:指向 Collector 传递给工厂的接收器配置设置的引用,以便它能够正确读取 Collector 配置中的设置。consumer.Metrics:指向流水线中下一个消费者类型的引用,即接收到的追踪将去往的地方。这可以是处理器、导出器或另一个连接器。
编写一个
NewFactory函数,实例化您的自定义连接器(组件)工厂。// NewFactory creates a factory for example connector. func NewFactory() connector.Factory { // OpenTelemetry connector factory to make a factory for connectors return connector.NewFactory( Type, createDefaultConfig, connector.WithTracesToMetrics(createTracesToMetricsConnector, component.StabilityLevelAlpha)) }需要注意的是,连接器可以支持多种有序的数据类型组合。
完成后,factory.go 如下
package exampleconnector
import (
"context"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
)
const defaultVal string = "request.n"
// Type is the component type name for this connector
var Type = component.MustNewType("example")
// NewFactory creates a factory for example connector.
func NewFactory() connector.Factory {
// OpenTelemetry connector factory to make a factory for connectors
return connector.NewFactory(
Type,
createDefaultConfig,
connector.WithTracesToMetrics(createTracesToMetricsConnector, component.StabilityLevelAlpha))
}
func createDefaultConfig() component.Config {
return &Config{
AttributeName: defaultVal,
}
}
// createTracesToMetricsConnector defines the consumer type of the connector
// We want to consume traces and export metrics, therefore, define nextConsumer as metrics, since consumer is the next component in the pipeline
func createTracesToMetricsConnector(ctx context.Context, params connector.Settings, cfg component.Config, nextConsumer consumer.Metrics) (connector.Traces, error) {
return newConnector(params.Logger, cfg, nextConsumer)
}
实现 Trace 连接器
在 connector.go 文件中实现特定于组件类型的接口方法。在本教程中,我们将实现 Trace 连接器,因此必须实现接口:baseConsumer、Traces 和 component.Component。
定义具有连接器所需参数的连接器结构体
// schema for connector type connectorImp struct { config Config metricsConsumer consumer.Metrics logger *zap.Logger // Include these parameters if a specific implementation for the Start and Shutdown function are not needed component.StartFunc component.ShutdownFunc }定义
newConnector函数来创建连接器// newConnector is a function to create a new connector func newConnector(logger *zap.Logger, config component.Config, nextConsumer consumer.Metrics) (*connectorImp, error) { logger.Info("Building exampleconnector connector") cfg := config.(*Config) return &connectorImp{ config: *cfg, logger: logger, metricsConsumer: nextConsumer, }, nil }newConnector函数是创建连接器实例的工厂函数。实现
Capabilities方法以正确实现接口// Capabilities implements the consumer interface. func (c *connectorImp) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} }实现
Capabilities方法以确保您的连接器是 Consumer 类型。此方法定义了组件的功能,组件是否可以修改数据。如果MutatesData设置为 true,则表示连接器会修改其接收到的数据结构。实现
Consumer方法以消耗遥测数据// ConsumeTraces method is called for each instance of a trace sent to the connector func (c *connectorImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { // loop through the levels of spans of the one trace consumed for i := 0; i < td.ResourceSpans().Len(); i++ { resourceSpan := td.ResourceSpans().At(i) for j := 0; j < resourceSpan.ScopeSpans().Len(); j++ { scopeSpan := resourceSpan.ScopeSpans().At(j) for k := 0; k < scopeSpan.Spans().Len(); k++ { span := scopeSpan.Spans().At(k) attrs := span.Attributes() if _, ok := attrs.Get(c.config.AttributeName); ok { // create metric only if span of trace had the specific attribute metrics := pmetric.NewMetrics() return c.metricsConsumer.ConsumeMetrics(ctx, metrics) } } } } return nil }可选:仅当需要特定实现时,才实现
Start和Shutdown方法以正确实现接口。否则,将component.StartFunc和component.ShutdownFunc作为定义的连接器结构体的一部分就足够了。
完整的连接器文件应如下所示
exampleconnector/connector.go
package exampleconnector
import (
"context"
"go.uber.org/zap"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)
// schema for connector
type connectorImp struct {
config Config
metricsConsumer consumer.Metrics
logger *zap.Logger
// Include these parameters if a specific implementation for the Start and Shutdown function are not needed
component.StartFunc
component.ShutdownFunc
}
// newConnector is a function to create a new connector
func newConnector(logger *zap.Logger, config component.Config, nextConsumer consumer.Metrics) (*connectorImp, error) {
logger.Info("Building exampleconnector connector")
cfg := config.(*Config)
return &connectorImp{
config: *cfg,
logger: logger,
metricsConsumer: nextConsumer,
}, nil
}
// Capabilities implements the consumer interface.
func (c *connectorImp) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
// ConsumeTraces method is called for each instance of a trace sent to the connector
func (c *connectorImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
// loop through the levels of spans of the one trace consumed
for i := 0; i < td.ResourceSpans().Len(); i++ {
resourceSpan := td.ResourceSpans().At(i)
for j := 0; j < resourceSpan.ScopeSpans().Len(); j++ {
scopeSpan := resourceSpan.ScopeSpans().At(j)
for k := 0; k < scopeSpan.Spans().Len(); k++ {
span := scopeSpan.Spans().At(k)
attrs := span.Attributes()
if _, ok := attrs.Get(c.config.AttributeName); ok {
// create metric only if span of trace had the specific attribute
metrics := pmetric.NewMetrics()
return c.metricsConsumer.ConsumeMetrics(ctx, metrics)
}
}
}
}
return nil
}
使用组件
OpenTelemetry Collector Builder 使用摘要:
您可以使用 OpenTelemetry Collector Builder 来构建和运行您的代码。Collector Builder 是一个工具,可以帮助您构建自己的 OpenTelemetry Collector 二进制文件。您可以根据需要添加或删除组件(接收器、处理器、连接器和导出器)。
请遵循 OpenTelemetry Collector Builder 的 安装说明。
编写配置文件
安装完成后,下一步是创建配置文件
builder-config.yaml。此文件定义了您要在自定义二进制文件中包含的 Collector 组件。以下是您可以使用的新连接器组件的配置文件的示例
dist: name: otelcol-dev-bin description: Basic OpenTelemetry collector distribution for Developers output_path: ./otelcol-dev exporters: - gomod: go.opentelemetry.io/collector/exporter/debugexporter v0.129.0 receivers: - gomod: go.opentelemetry.io/collector/receiver/otlpreceiver v0.129.0 # Not used in this tutorial, but can be added if needed for your use case # processors: connectors: - gomod: github.com/gord02/exampleconnector v0.129.0 replaces: # a list of "replaces" directives that will be part of the resulting go.mod # This replace statement is necessary since the newly added component is not found/published to GitHub yet. Replace references to GitHub path with the local path - github.com/gord02/exampleconnector => [PATH-TO-COMPONENT-CODE]/exampleconnector必须包含一个 replace 语句。由于您的新创建的组件尚未发布到 GitHub,因此需要 replace 部分。对您的组件 GitHub 路径的引用将需要替换为指向您本地代码的路径。
有关 Go 中替换的更多详细信息,请参阅 Go mod 文件 Replace。
构建您的 Collector 二进制文件
运行 builder,并传递 detailing 包含的连接器组件的 builder 配置文件,这将构建自定义 Collector 二进制文件
./ocb --config [PATH-TO-CONFIG]/builder-config.yaml这将在配置文件中指定的输出路径目录中生成 Collector 二进制文件。
构建成功后,您应该会看到类似以下的输出
./ocb --config builder-config.yaml 2025-07-15T22:10:10.351+0900 INFO internal/command.go:99 OpenTelemetry Collector Builder {"version": "0.129.0"} 2025-07-15T22:10:10.352+0900 INFO internal/command.go:104 Using config file {"path": "builder-config.yaml"} 2025-07-15T22:10:10.353+0900 INFO builder/config.go:160 Using go {"go-executable": "/opt/homebrew/Cellar/go@1.23/1.23.6/bin/go"} 2025-07-15T22:10:10.354+0900 INFO builder/main.go:99 Sources created {"path": "./otelcol-dev"} 2025-07-15T22:10:10.516+0900 INFO builder/main.go:201 Getting go modules 2025-07-15T22:10:10.554+0900 INFO builder/main.go:110 Compiling 2025-07-15T22:10:13.369+0900 INFO builder/main.go:140 Compiled {"binary": "./otelcol-dev/otelcol-dev-bin"}运行您的 Collector 二进制文件
现在您可以使用步骤 3 输出中的二进制文件路径来运行自定义 Collector 二进制文件(例如,
{"binary": "./otelcol-dev/otelcol-dev-bin"})./otelcol-dev/otelcol-dev-bin --config [PATH-TO-CONFIG]/config.yaml输出路径名称和 dist 名称在
build-config.yaml中详细说明。
测试你的连接器
现在您已经构建了示例连接器,让我们通过单元测试来验证其功能。Go 单元测试提供更好的覆盖率,并且更容易维护。
编写单元测试
在连接器目录中创建一个测试文件 connector_test.go
exampleconnector/connector_test.go
package exampleconnector
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/vibeus/opentelemetry-collector/confmap/xconfmap"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
)
func TestConsumeTraces(t *testing.T) {
// Create a test consumer that captures metrics
metricsConsumer := &consumertest.MetricsSink{}
// Create connector with test configuration
cfg := &Config{
AttributeName: "request.n",
}
connector, err := newConnector(zap.NewNop(), cfg, metricsConsumer)
require.NoError(t, err)
ctx := context.Background()
t.Run("span with target attribute generates metric", func(t *testing.T) {
// Reset the consumer
metricsConsumer.Reset()
// Create trace data with target attribute
traces := ptrace.NewTraces()
resourceSpan := traces.ResourceSpans().AppendEmpty()
scopeSpan := resourceSpan.ScopeSpans().AppendEmpty()
span := scopeSpan.Spans().AppendEmpty()
// Add the target attribute
span.Attributes().PutStr("request.n", "test-value")
span.Attributes().PutStr("http.method", "GET")
// Consume the traces
err := connector.ConsumeTraces(ctx, traces)
require.NoError(t, err)
// Verify metric was generated
assert.Equal(t, 1, len(metricsConsumer.AllMetrics()))
})
t.Run("span without target attribute does not generate metric", func(t *testing.T) {
// Reset the consumer
metricsConsumer.Reset()
// Create trace data without target attribute
traces := ptrace.NewTraces()
resourceSpan := traces.ResourceSpans().AppendEmpty()
scopeSpan := resourceSpan.ScopeSpans().AppendEmpty()
span := scopeSpan.Spans().AppendEmpty()
// Add other attributes but not the target one
span.Attributes().PutStr("http.method", "POST")
span.Attributes().PutStr("user.id", "12345")
// Consume the traces
err := connector.ConsumeTraces(ctx, traces)
require.NoError(t, err)
// Verify no metric was generated
assert.Equal(t, 0, len(metricsConsumer.AllMetrics()))
})
t.Run("multiple spans with mixed attributes", func(t *testing.T) {
// Reset the consumer
metricsConsumer.Reset()
// Create trace data with multiple spans
traces := ptrace.NewTraces()
resourceSpan := traces.ResourceSpans().AppendEmpty()
scopeSpan := resourceSpan.ScopeSpans().AppendEmpty()
// First span with target attribute
span1 := scopeSpan.Spans().AppendEmpty()
span1.Attributes().PutStr("request.n", "value1")
// Second span without target attribute
span2 := scopeSpan.Spans().AppendEmpty()
span2.Attributes().PutStr("other.attr", "value2")
// Consume the traces
err := connector.ConsumeTraces(ctx, traces)
require.NoError(t, err)
// Should generate exactly one metric (from first span only)
assert.Equal(t, 1, len(metricsConsumer.AllMetrics()))
})
}
func TestConnectorCapabilities(t *testing.T) {
connector := &connectorImp{}
capabilities := connector.Capabilities()
assert.False(t, capabilities.MutatesData)
}
func TestCreateDefaultConfig(t *testing.T) {
cfg := createDefaultConfig()
assert.NotNil(t, cfg)
exampleConfig := cfg.(*Config)
assert.Equal(t, "request.n", exampleConfig.AttributeName)
}
func TestConfigValidation(t *testing.T) {
t.Run("valid config", func(t *testing.T) {
cfg := &Config{
AttributeName: "test.attribute",
}
err := xconfmap.Validate(cfg)
assert.NoError(t, err)
})
t.Run("invalid config - empty attribute name", func(t *testing.T) {
cfg := &Config{
AttributeName: "",
}
err := xconfmap.Validate(cfg)
assert.Error(t, err)
assert.Contains(t, err.Error(), "attribute_name must not be empty")
})
}
运行测试
将测试依赖项添加到您的
go.modgo mod tidy运行测试
go test -cover -v ./...
预期的测试输出
测试成功运行时,您应该会看到类似以下的输出
go test -cover -v ./...
=== RUN TestConsumeTraces
=== RUN TestConsumeTraces/span_with_target_attribute_generates_metric
=== RUN TestConsumeTraces/span_without_target_attribute_does_not_generate_metric
=== RUN TestConsumeTraces/multiple_spans_with_mixed_attributes
--- PASS: TestConsumeTraces (0.00s)
--- PASS: TestConsumeTraces/span_with_target_attribute_generates_metric (0.00s)
--- PASS: TestConsumeTraces/span_without_target_attribute_does_not_generate_metric (0.00s)
--- PASS: TestConsumeTraces/multiple_spans_with_mixed_attributes (0.00s)
=== RUN TestConnectorCapabilities
--- PASS: TestConnectorCapabilities (0.00s)
=== RUN TestCreateDefaultConfig
--- PASS: TestCreateDefaultConfig (0.00s)
=== RUN TestConfigValidation
=== RUN TestConfigValidation/valid_config
=== RUN TestConfigValidation/invalid_config_-_empty_attribute_name
--- PASS: TestConfigValidation (0.00s)
--- PASS: TestConfigValidation/valid_config (0.00s)
--- PASS: TestConfigValidation/invalid_config_-_empty_attribute_name (0.00s)
PASS
coverage: 90.5% of statements
ok github.com/gord02/exampleconnector 0.501s coverage: 90.5% of statements
这些单元测试为您的连接器功能提供了全面的覆盖,并且是 OpenTelemetry Collector 生态系统中验证组件行为的推荐方法。
关于 OpenTelemetry Collector Builder 的附加资源
- 构建自定义 Collector
- OpenTelemetry Collector Builder README
- Dan Jaglowski 在 OpenTelemetry Collector 中连接的可观测性流水线
- Connector README