构建连接器

OpenTelemetry 中的连接器

本页面的内容最适用于您已经拥有一个生成某种追踪遥测数据的已 instrumented 应用程序,并且已经了解 OpenTelemetry Collector

什么是连接器?

连接器充当将遥测数据在不同 Collector 流水线之间传递的手段,从而将它们连接起来。连接器对一个流水线充当导出器,对另一个流水线充当接收器。OpenTelemetry Collector 中的每个流水线都处理一种遥测数据。可能存在需要将一种形式的遥测数据处理成另一种形式的需求,但需要将相应数据路由到其正确的 Collector 流水线。

为什么要使用连接器?

连接器在合并、路由和复制数据流方面非常有用。除了顺序流水线(即将流水线连接在一起)之外,连接器组件还能够进行条件数据流和生成数据流。条件数据流意味着将数据发送到最高优先级的流水线,并具有错误检测功能,以便在需要时路由到备用流水线。生成数据流意味着该组件根据接收到的数据生成并发出自己的数据。本教程强调连接器连接流水线的能力。

OpenTelemetry 中存在一些处理器,可以将一种类型的遥测数据转换为另一种类型。例如 spanmetrics 处理器和 servicegraph 处理器。spanmetrics 处理器从 span 数据生成聚合请求、错误和持续时间指标。servicegraph 处理器分析追踪数据并生成描述服务之间关系的指标。这两个处理器都接收追踪数据并将其转换为指标数据。由于 OpenTelemetry Collector 中的流水线仅用于一种数据类型,因此需要将处理器在追踪流水线中产生的追踪数据转换为指标数据,并发送到指标流水线。历史上,一些处理器通过利用一种坏实践的变通方法来传输数据,即处理器在处理后直接导出数据。连接器组件解决了这种变通方法的需要,并且历史上使用这种变通方法的处理器已被弃用。同样,上面提到的处理器在近期版本中也已被弃用,并被连接器取代。

有关连接器完整功能的更多详细信息,请参阅以下链接:OpenTelemetry 中的连接器是什么?OpenTelemetry 连接器配置

旧的架构:

Before picture of how processors emitted data directly to another pipelines exporter

使用连接器的新架构:

How the pipeline should work using the connector component

构建示例连接器

在本教程中,我们将编写一个示例连接器,它将追踪转换为指标,作为 OpenTelemetry Collector 中连接器组件功能的基本示例。基本连接器的功能是简单地计算包含特定属性名称的追踪中的 span 数量。这些出现次数的计数存储在连接器中。

配置

设置 Collector 配置:

config.yaml 文件中设置 OpenTelemetry Collector 的配置。此文件定义了数据将如何被路由、处理和导出。文件中定义的配置详细说明了您希望数据流水线如何运行。您可以定义组件以及数据如何在定义的流水线中从头到尾移动。有关如何配置 Collector 的更多详细信息,请参阅 Collector 配置

使用下面代码作为我们将要构建的示例连接器。该代码是基本有效的 OpenTelemetry Collector 配置文件的示例。

receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318

exporters:
  debug:

connectors:
  example:

service:
  pipelines:
    traces:
      receivers: [otlp]
      exporters: [example]
    metrics:
      receivers: [example]
      exporters: [debug]

在上述代码的 connectors 部分,您需要声明流水线中可用的连接器名称。这里,example 是我们将在本教程中创建的连接器的名称。

实现

  1. 为您的示例连接器创建一个文件夹。在本教程中,我们将创建一个名为 exampleconnector 的文件夹。

  2. 导航到该文件夹并运行

    go mod init github.com/gord02/exampleconnector
    
  3. 运行 go mod tidy

    这将创建 go.modgo.sum 文件。

  4. 在该文件夹中创建以下文件

    • config.go - 用于定义连接器设置的文件
    • factory.go - 用于创建连接器实例的文件

在 config.go 中创建连接器设置

为了能够实例化您的连接器并参与流水线,Collector 需要识别您的连接器并从其配置文件中正确加载其设置。

为了能够让您的连接器访问其设置,请创建一个 Config 结构体。该结构体必须为连接器的每个设置都包含一个导出的字段。添加的参数字段将可以从 config.yaml 文件访问。它们在配置文件中的名称通过结构体标签设置。创建结构体并添加参数。您可以选择添加一个验证函数来检查给定的默认值对于您的连接器实例是否有效。

config.go 文件应如下所示

exampleconnector/config.go

package exampleconnector

import "fmt"

// Config represents the connector config settings within the collector's config.yaml
type Config struct {
    AttributeName string `mapstructure:"attribute_name"`
}

func (c *Config) Validate() error {
    if c.AttributeName == "" {
        return fmt.Errorf("attribute_name must not be empty")
    }
    return nil
}

有关 mapstructure 的更多详细信息,请参阅 Go mapstructure

实现 Factory

要实例化对象,您需要使用与每个组件关联的 NewFactory 函数。我们将使用 connector.NewFactory 函数。connector.NewFactory 函数实例化并返回一个 connector.Factory,它需要以下参数

  • component.Type:您的连接器在所有同类型 Collector 组件中唯一的字符串标识符。此字符串也用作引用连接器的名称。
  • component.CreateDefaultConfigFunc:一个指向返回您的连接器的默认 component.Config 实例的函数的引用。
  • ...FactoryOptionconnector.FactoryOptions 的切片将决定您的连接器能够处理哪种类型的信号。
  1. 创建 factory.go 文件,并将用于标识连接器的唯一字符串定义为全局常量。

    const defaultVal string = "request.n"
    
    // Type is the component type name for this connector
    var Type = component.MustNewType("example")
    
  2. 创建默认配置函数。这是您选择使用默认值初始化连接器对象的方式。

    func createDefaultConfig() component.Config {
        return &Config{
            AttributeName: defaultVal,
        }
    }
    
  3. 定义您将要工作的连接器类型。这将作为工厂选项传递。连接器可以连接不同或相似类型的流水线。我们必须定义连接器导出端和接收端。一个导出追踪并接收指标的连接器只是连接器组件的一种配置,其定义顺序很重要。一个导出追踪并接收指标的连接器与一个可以导出指标并接收追踪的连接器不同。

    // createTracesToMetricsConnector defines the consumer type of the connector
    // We want to consume traces and export metrics, therefore, define nextConsumer as metrics, since consumer is the next component in the pipeline
    func createTracesToMetricsConnector(ctx context.Context, params connector.Settings, cfg component.Config, nextConsumer consumer.Metrics) (connector.Traces, error) {
        return newConnector(params.Logger, cfg, nextConsumer)
    }
    

    createTracesToMetricsConnector 是一个进一步初始化连接器组件的函数,通过定义其消费者组件,即连接器传输数据后要摄取数据的下一个组件。需要注意的是,连接器不限于一种有序的类型组合,就像我们这里一样。例如,count 连接器为 traces to metrics、logs to metrics 和 metrics to metrics 定义了几个这样的函数。

    createTracesToMetricsConnector 的参数

    • context.Context:Collector 的 context.Context 的引用,以便您的追踪接收器能够正确管理其执行上下文。
    • connector.CreateSettings:Collector 设置的引用,Collector 在此设置下创建您的接收器。
    • component.Config:指向 Collector 传递给工厂的接收器配置设置的引用,以便它能够正确读取 Collector 配置中的设置。
    • consumer.Metrics:指向流水线中下一个消费者类型的引用,即接收到的追踪将去往的地方。这可以是处理器、导出器或另一个连接器。
  4. 编写一个 NewFactory 函数,实例化您的自定义连接器(组件)工厂。

    // NewFactory creates a factory for example connector.
    func NewFactory() connector.Factory {
        // OpenTelemetry connector factory to make a factory for connectors
        return connector.NewFactory(
            Type,
            createDefaultConfig,
            connector.WithTracesToMetrics(createTracesToMetricsConnector, component.StabilityLevelAlpha))
    }
    

    需要注意的是,连接器可以支持多种有序的数据类型组合。

完成后,factory.go 如下

package exampleconnector

import (
	"context"

	"go.opentelemetry.io/collector/component"
	"go.opentelemetry.io/collector/connector"
	"go.opentelemetry.io/collector/consumer"
)

const defaultVal string = "request.n"

// Type is the component type name for this connector
var Type = component.MustNewType("example")

// NewFactory creates a factory for example connector.
func NewFactory() connector.Factory {
	// OpenTelemetry connector factory to make a factory for connectors
	return connector.NewFactory(
		Type,
		createDefaultConfig,
		connector.WithTracesToMetrics(createTracesToMetricsConnector, component.StabilityLevelAlpha))
}

func createDefaultConfig() component.Config {
	return &Config{
		AttributeName: defaultVal,
	}
}

// createTracesToMetricsConnector defines the consumer type of the connector
// We want to consume traces and export metrics, therefore, define nextConsumer as metrics, since consumer is the next component in the pipeline
func createTracesToMetricsConnector(ctx context.Context, params connector.Settings, cfg component.Config, nextConsumer consumer.Metrics) (connector.Traces, error) {
	return newConnector(params.Logger, cfg, nextConsumer)
}

实现 Trace 连接器

connector.go 文件中实现特定于组件类型的接口方法。在本教程中,我们将实现 Trace 连接器,因此必须实现接口:baseConsumerTracescomponent.Component

  1. 定义具有连接器所需参数的连接器结构体

    // schema for connector
    type connectorImp struct {
        config          Config
        metricsConsumer consumer.Metrics
        logger          *zap.Logger
        // Include these parameters if a specific implementation for the Start and Shutdown function are not needed
        component.StartFunc
        component.ShutdownFunc
    }
    
  2. 定义 newConnector 函数来创建连接器

    // newConnector is a function to create a new connector
    func newConnector(logger *zap.Logger, config component.Config, nextConsumer consumer.Metrics) (*connectorImp, error) {
        logger.Info("Building exampleconnector connector")
        cfg := config.(*Config)
    
        return &connectorImp{
            config:          *cfg,
            logger:          logger,
            metricsConsumer: nextConsumer,
        }, nil
    }
    

    newConnector 函数是创建连接器实例的工厂函数。

  3. 实现 Capabilities 方法以正确实现接口

    // Capabilities implements the consumer interface.
    func (c *connectorImp) Capabilities() consumer.Capabilities {
        return consumer.Capabilities{MutatesData: false}
    }
    

    实现 Capabilities 方法以确保您的连接器是 Consumer 类型。此方法定义了组件的功能,组件是否可以修改数据。如果 MutatesData 设置为 true,则表示连接器会修改其接收到的数据结构。

  4. 实现 Consumer 方法以消耗遥测数据

    // ConsumeTraces method is called for each instance of a trace sent to the connector
    func (c *connectorImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
        // loop through the levels of spans of the one trace consumed
        for i := 0; i < td.ResourceSpans().Len(); i++ {
            resourceSpan := td.ResourceSpans().At(i)
    
            for j := 0; j < resourceSpan.ScopeSpans().Len(); j++ {
                scopeSpan := resourceSpan.ScopeSpans().At(j)
    
                for k := 0; k < scopeSpan.Spans().Len(); k++ {
                    span := scopeSpan.Spans().At(k)
                    attrs := span.Attributes()
                    if _, ok := attrs.Get(c.config.AttributeName); ok {
                        // create metric only if span of trace had the specific attribute
                        metrics := pmetric.NewMetrics()
                        return c.metricsConsumer.ConsumeMetrics(ctx, metrics)
                    }
                }
            }
        }
        return nil
    }
    
  5. 可选:仅当需要特定实现时,才实现 StartShutdown 方法以正确实现接口。否则,将 component.StartFunccomponent.ShutdownFunc 作为定义的连接器结构体的一部分就足够了。

完整的连接器文件应如下所示

exampleconnector/connector.go

package exampleconnector

import (
	"context"

	"go.uber.org/zap"

	"go.opentelemetry.io/collector/component"
	"go.opentelemetry.io/collector/consumer"
	"go.opentelemetry.io/collector/pdata/pmetric"
	"go.opentelemetry.io/collector/pdata/ptrace"
)

// schema for connector
type connectorImp struct {
	config          Config
	metricsConsumer consumer.Metrics
	logger          *zap.Logger
	// Include these parameters if a specific implementation for the Start and Shutdown function are not needed
	component.StartFunc
	component.ShutdownFunc
}

// newConnector is a function to create a new connector
func newConnector(logger *zap.Logger, config component.Config, nextConsumer consumer.Metrics) (*connectorImp, error) {
	logger.Info("Building exampleconnector connector")
	cfg := config.(*Config)

	return &connectorImp{
		config:          *cfg,
		logger:          logger,
		metricsConsumer: nextConsumer,
	}, nil
}

// Capabilities implements the consumer interface.
func (c *connectorImp) Capabilities() consumer.Capabilities {
	return consumer.Capabilities{MutatesData: false}
}

// ConsumeTraces method is called for each instance of a trace sent to the connector
func (c *connectorImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
	// loop through the levels of spans of the one trace consumed
	for i := 0; i < td.ResourceSpans().Len(); i++ {
		resourceSpan := td.ResourceSpans().At(i)

		for j := 0; j < resourceSpan.ScopeSpans().Len(); j++ {
			scopeSpan := resourceSpan.ScopeSpans().At(j)

			for k := 0; k < scopeSpan.Spans().Len(); k++ {
				span := scopeSpan.Spans().At(k)
				attrs := span.Attributes()
				if _, ok := attrs.Get(c.config.AttributeName); ok {
					// create metric only if span of trace had the specific attribute
					metrics := pmetric.NewMetrics()
					return c.metricsConsumer.ConsumeMetrics(ctx, metrics)
				}
			}
		}
	}
	return nil
}

使用组件

OpenTelemetry Collector Builder 使用摘要:

您可以使用 OpenTelemetry Collector Builder 来构建和运行您的代码。Collector Builder 是一个工具,可以帮助您构建自己的 OpenTelemetry Collector 二进制文件。您可以根据需要添加或删除组件(接收器、处理器、连接器和导出器)。

  1. 请遵循 OpenTelemetry Collector Builder 的 安装说明

  2. 编写配置文件

    安装完成后,下一步是创建配置文件 builder-config.yaml。此文件定义了您要在自定义二进制文件中包含的 Collector 组件。

    以下是您可以使用的新连接器组件的配置文件的示例

    dist:
      name: otelcol-dev-bin
      description: Basic OpenTelemetry collector distribution for Developers
      output_path: ./otelcol-dev
    
    exporters:
      - gomod: go.opentelemetry.io/collector/exporter/debugexporter v0.129.0
    
    receivers:
      - gomod: go.opentelemetry.io/collector/receiver/otlpreceiver v0.129.0
    
    # Not used in this tutorial, but can be added if needed for your use case
    # processors:
    
    connectors:
      - gomod: github.com/gord02/exampleconnector v0.129.0
    
    replaces:
      # a list of "replaces" directives that will be part of the resulting go.mod
    
      # This replace statement is necessary since the newly added component is not found/published to GitHub yet. Replace references to GitHub path with the local path
      - github.com/gord02/exampleconnector =>
        [PATH-TO-COMPONENT-CODE]/exampleconnector
    

    必须包含一个 replace 语句。由于您的新创建的组件尚未发布到 GitHub,因此需要 replace 部分。对您的组件 GitHub 路径的引用将需要替换为指向您本地代码的路径。

    有关 Go 中替换的更多详细信息,请参阅 Go mod 文件 Replace

  3. 构建您的 Collector 二进制文件

    运行 builder,并传递 detailing 包含的连接器组件的 builder 配置文件,这将构建自定义 Collector 二进制文件

    ./ocb --config [PATH-TO-CONFIG]/builder-config.yaml
    

    这将在配置文件中指定的输出路径目录中生成 Collector 二进制文件。

    构建成功后,您应该会看到类似以下的输出

    ./ocb --config builder-config.yaml
    2025-07-15T22:10:10.351+0900    INFO    internal/command.go:99  OpenTelemetry Collector Builder {"version": "0.129.0"}
    2025-07-15T22:10:10.352+0900    INFO    internal/command.go:104 Using config file       {"path": "builder-config.yaml"}
    2025-07-15T22:10:10.353+0900    INFO    builder/config.go:160   Using go        {"go-executable": "/opt/homebrew/Cellar/go@1.23/1.23.6/bin/go"}
    2025-07-15T22:10:10.354+0900    INFO    builder/main.go:99      Sources created {"path": "./otelcol-dev"}
    2025-07-15T22:10:10.516+0900    INFO    builder/main.go:201     Getting go modules
    2025-07-15T22:10:10.554+0900    INFO    builder/main.go:110     Compiling
    2025-07-15T22:10:13.369+0900    INFO    builder/main.go:140     Compiled        {"binary": "./otelcol-dev/otelcol-dev-bin"}
    
  4. 运行您的 Collector 二进制文件

    现在您可以使用步骤 3 输出中的二进制文件路径来运行自定义 Collector 二进制文件(例如,{"binary": "./otelcol-dev/otelcol-dev-bin"}

    ./otelcol-dev/otelcol-dev-bin --config [PATH-TO-CONFIG]/config.yaml
    

    输出路径名称和 dist 名称在 build-config.yaml 中详细说明。

测试你的连接器

现在您已经构建了示例连接器,让我们通过单元测试来验证其功能。Go 单元测试提供更好的覆盖率,并且更容易维护。

编写单元测试

在连接器目录中创建一个测试文件 connector_test.go

exampleconnector/connector_test.go

package exampleconnector

import (
	"context"
	"testing"

	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"
	"github.com/vibeus/opentelemetry-collector/confmap/xconfmap"
	"go.opentelemetry.io/collector/consumer/consumertest"
	"go.opentelemetry.io/collector/pdata/ptrace"
	"go.uber.org/zap"
)

func TestConsumeTraces(t *testing.T) {
	// Create a test consumer that captures metrics
	metricsConsumer := &consumertest.MetricsSink{}

	// Create connector with test configuration
	cfg := &Config{
		AttributeName: "request.n",
	}

	connector, err := newConnector(zap.NewNop(), cfg, metricsConsumer)
	require.NoError(t, err)

	ctx := context.Background()

	t.Run("span with target attribute generates metric", func(t *testing.T) {
		// Reset the consumer
		metricsConsumer.Reset()

		// Create trace data with target attribute
		traces := ptrace.NewTraces()
		resourceSpan := traces.ResourceSpans().AppendEmpty()
		scopeSpan := resourceSpan.ScopeSpans().AppendEmpty()
		span := scopeSpan.Spans().AppendEmpty()

		// Add the target attribute
		span.Attributes().PutStr("request.n", "test-value")
		span.Attributes().PutStr("http.method", "GET")

		// Consume the traces
		err := connector.ConsumeTraces(ctx, traces)
		require.NoError(t, err)

		// Verify metric was generated
		assert.Equal(t, 1, len(metricsConsumer.AllMetrics()))
	})

	t.Run("span without target attribute does not generate metric", func(t *testing.T) {
		// Reset the consumer
		metricsConsumer.Reset()

		// Create trace data without target attribute
		traces := ptrace.NewTraces()
		resourceSpan := traces.ResourceSpans().AppendEmpty()
		scopeSpan := resourceSpan.ScopeSpans().AppendEmpty()
		span := scopeSpan.Spans().AppendEmpty()

		// Add other attributes but not the target one
		span.Attributes().PutStr("http.method", "POST")
		span.Attributes().PutStr("user.id", "12345")

		// Consume the traces
		err := connector.ConsumeTraces(ctx, traces)
		require.NoError(t, err)

		// Verify no metric was generated
		assert.Equal(t, 0, len(metricsConsumer.AllMetrics()))
	})

	t.Run("multiple spans with mixed attributes", func(t *testing.T) {
		// Reset the consumer
		metricsConsumer.Reset()

		// Create trace data with multiple spans
		traces := ptrace.NewTraces()
		resourceSpan := traces.ResourceSpans().AppendEmpty()
		scopeSpan := resourceSpan.ScopeSpans().AppendEmpty()

		// First span with target attribute
		span1 := scopeSpan.Spans().AppendEmpty()
		span1.Attributes().PutStr("request.n", "value1")

		// Second span without target attribute
		span2 := scopeSpan.Spans().AppendEmpty()
		span2.Attributes().PutStr("other.attr", "value2")

		// Consume the traces
		err := connector.ConsumeTraces(ctx, traces)
		require.NoError(t, err)

		// Should generate exactly one metric (from first span only)
		assert.Equal(t, 1, len(metricsConsumer.AllMetrics()))
	})
}

func TestConnectorCapabilities(t *testing.T) {
	connector := &connectorImp{}
	capabilities := connector.Capabilities()
	assert.False(t, capabilities.MutatesData)
}

func TestCreateDefaultConfig(t *testing.T) {
	cfg := createDefaultConfig()
	assert.NotNil(t, cfg)

	exampleConfig := cfg.(*Config)
	assert.Equal(t, "request.n", exampleConfig.AttributeName)
}

func TestConfigValidation(t *testing.T) {
	t.Run("valid config", func(t *testing.T) {
		cfg := &Config{
			AttributeName: "test.attribute",
		}
		err := xconfmap.Validate(cfg)
		assert.NoError(t, err)
	})

	t.Run("invalid config - empty attribute name", func(t *testing.T) {
		cfg := &Config{
			AttributeName: "",
		}
		err := xconfmap.Validate(cfg)
		assert.Error(t, err)
		assert.Contains(t, err.Error(), "attribute_name must not be empty")
	})
}

运行测试

  1. 将测试依赖项添加到您的 go.mod

    go mod tidy
    
  2. 运行测试

    go test -cover -v ./...
    

预期的测试输出

测试成功运行时,您应该会看到类似以下的输出

go test -cover -v ./...
=== RUN   TestConsumeTraces
=== RUN   TestConsumeTraces/span_with_target_attribute_generates_metric
=== RUN   TestConsumeTraces/span_without_target_attribute_does_not_generate_metric
=== RUN   TestConsumeTraces/multiple_spans_with_mixed_attributes
--- PASS: TestConsumeTraces (0.00s)
    --- PASS: TestConsumeTraces/span_with_target_attribute_generates_metric (0.00s)
    --- PASS: TestConsumeTraces/span_without_target_attribute_does_not_generate_metric (0.00s)
    --- PASS: TestConsumeTraces/multiple_spans_with_mixed_attributes (0.00s)
=== RUN   TestConnectorCapabilities
--- PASS: TestConnectorCapabilities (0.00s)
=== RUN   TestCreateDefaultConfig
--- PASS: TestCreateDefaultConfig (0.00s)
=== RUN   TestConfigValidation
=== RUN   TestConfigValidation/valid_config
=== RUN   TestConfigValidation/invalid_config_-_empty_attribute_name
--- PASS: TestConfigValidation (0.00s)
    --- PASS: TestConfigValidation/valid_config (0.00s)
    --- PASS: TestConfigValidation/invalid_config_-_empty_attribute_name (0.00s)
PASS
coverage: 90.5% of statements
ok      github.com/gord02/exampleconnector      0.501s  coverage: 90.5% of statements

这些单元测试为您的连接器功能提供了全面的覆盖,并且是 OpenTelemetry Collector 生态系统中验证组件行为的推荐方法。

关于 OpenTelemetry Collector Builder 的附加资源