Kafka.md

Kafka输出将事件发送到Apache Kafka。

要使用此输出,请编辑Filebeat配置文件以通过注释掉来禁用Elasticsearch输出,并通过取消注释Kafka部分来启用Kafka输出。

对于Kafka版本0.10.0.0+消息创建时间戳由beats设置,等于事件的初始时间戳。这会影响Kafka中的保留策略:例如,如果2周前创建了一个beats事件,则保留策略设置为7天,并且来自beats的消息今天到达Kafka,它将立即被丢弃,因为时间戳值在过去7天之前。可以通过在消息到达时设置时间戳来更改此行为,因此消息不会被丢弃,而是会再保留7天。为此,请在Kafka配置中将log.message.timestamp.type设置为LogAppendTime(默认CreateTime)。

示例配置:

output.kafka:
  # initial brokers for reading cluster metadata
  hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]

  # message topic selection + partitioning
  topic: '%{[fields.log_topic]}'
  partition.round_robin:
    reachable_only: false

  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

大于max_message_bytes的事件将被删除。要避免此问题,请确保FileBeat不会生成大于max_message_bytes的事件。

兼容性

此输出可以连接到0.8.2.0及更高版本的Kafka。旧版本可能也能工作,但不受支持。

配置选项

您可以在filebeat.yml配置文件的kafka部分中指定以下选项:

enabled

布尔,用于启用或禁用输出。如果设置为false,则禁用输出。

默认值为true

hosts

The list of Kafka broker addresses from where to fetch the cluster metadata. The cluster metadata contain the actual Kafka brokers events are published to.

version

FileBeat连接时会请求的Kafka协议版本。默认为1.0.0。

Valid值是0.8.2.02.6.0之间的所有kafka版本。

协议版本控制FileBeat可用的Kafka客户端功能;它不会阻止FileBeat连接到比协议版本更新的Kafka版本。

有关受支持版本的信息,请参阅兼容性

username

The username for connecting to Kafka. If username is configured, the password must be configured as well.

password

The password for connecting to Kafka.

sasl.mechanism

连接到Kafka时使用的SASL机制。它可以是以下之一:

  • PLAIN for SASL/PLAIN.
  • SCRAM-SHA-256 for SCRAM-SHA-256.
  • SCRAM-SHA-512 for SCRAM-SHA-512.

如果未设置sasl.mechanism,如果提供了 username and password ,则使用PLAIN。否则,SASL身份验证将被禁用。

要使用GSSAPI机制对Kerberos进行身份验证,您必须将此字段留空,并使用kerberos选项。

topic

用于生成事件的 Kafka topic。

您可以通过使用 format string 访问任何事件字段来动态设置 topic。例如,此配置使用自定义字段 fields.log_topic 为每个事件设置 topic:

topic: '%{[fields.log_topic]}'

若要了解如何向事件添加自定义字段,请参阅fields选项。

有关动态设置 topic 的其他方法,请参阅 topics 设置。

topics

topic 选择器规则数组。每个规则指定用于匹配规则的事件的topic。在发布过程中,FileBeat根据数组中的第一个匹配规则为每个事件设置topic。规则可以包含条件、基于字符串的格式字段和名称映射。如果topics设置缺失或没有规则匹配,则使用 topic 字段。

规则设置:

topic

要使用的 topic 格式字符串。如果此字符串包含字段引用,例如%{[fields.name]},则这些字段必须存在,否则规则失败。

mappings

获取topic返回的值并将其映射到新名称的字典。

default

如果mappings未找到匹配项,则使用的默认字符串值。

when

必须成功才能执行当前规则的条件。这里也支持处理器支持的所有条件

以下示例根据消息字段是否包含指定的字符串来设置主题:

output.kafka:
  hosts: ["localhost:9092"]
  topic: "logs-%{[agent.version]}"
  topics:
    - topic: "critical-%{[agent.version]}"
      when.contains:
        message: "CRITICAL"
    - topic: "error-%{[agent.version]}"
      when.contains:
        message: "ERR"

此配置会生成名为critical-8.5.1error-8.5.1logs-8.5.1的主题。

key

指定Kafka事件key的可选格式化字符串。如果配置,可以使用格式字符串从事件中提取事件key。

有关特定密钥选择的含义,请参见Kafka文档;默认情况下,密钥由Kafka集群选择。

partition

Kafka输出代理事件分区策略。必须是 random, round_robin, or hash之一。默认情况下使用hash分区器。

random.group_events: 在分区程序随机选择新分区之前,设置要发布到同一分区的事件数。默认值为1,即每个事件后随机选择一个新分区。

round_robin.group_events: 在分区程序选择下一个分区之前,设置要发布到同一分区的事件数。默认值为1,表示每个事件后将选择下一个分区。

hash.hash: 用于计算分区哈希值的字段列表。如果未配置字段,则将使用事件key值。

hash.random: 如果无法计算哈希或键值,则随机分发事件。

默认情况下,所有分区都将尝试将事件发布到所有分区。如果某个分区的领导者在beat中无法访问,则输出可能会阻塞。所有分区都支持设置reachable_only覆盖此行为。如果reachable_only设置为true,则事件将仅发布到可用分区。

发布到可用分区的子集可能会增加资源使用,因为事件可能会分布不均匀。

headers

一个header是键值对,同一个key可以包含多个headers。仅支持字符串值。这些headers将包含在每个生成的Kafka消息中。

output.kafka:
  hosts: ["localhost:9092"]
  topic: "logs-%{[agent.version]}"
  headers:
    - key: "some-key"
      value: "some value"
    - key: "another-key"
      value: "another value"

client_id

用于日志记录、调试和审计目的的可配置ClientID。默认值为“beats”。

codec

输出编解码器配置。如果缺少codec部分,事件将被json编码。

有关详细信息,请参阅更改输出编解码器

metadata

Kafka元数据更新设置。元数据确实包含有关用于发布的 brokers、topics、partition 和 active leaders 的信息。

refresh_frequency

元数据刷新间隔。默认为10分钟。

full

获取元数据时使用的策略,当此选项为true时,客户端将为所有可用topics维护全套元数据,如果此选项设置为false,则仅刷新已配置主题的元数据。

默认为false

retry.max

群集处于领导者选举中间时,元数据更新重试的总数。默认值为3。

retry.backoff

领导者选举期间重试之间的等待时间。默认值为 250ms。

max_retries

Filebeat ignores the max_retries setting and retries indefinitely.

backoff.init

网络错误后尝试重新发布到Kafka之前要等待的秒数。等待backoff.init秒后,FileBeat尝试重新发布。如果尝试失败,退避计时器将按指数增加,最高可达backoff.max。成功发布后,退避计时器将重置。默认值为 1s。

backoff.max

在网络错误后尝试重新发布到Kafka之前等待的最大秒数。默认值为60秒。

bulk_max_size

单个Kafka请求中要批量处理的最大事件数。默认值为2048。

bulk_flush_frequency

在发送批量Kafka请求之前等待的持续时间。0是没有延迟的。默认值为0。

timeout

超时前等待Kafka brokers响应的秒数。默认值为30(秒)。

broker_timeout

代理等待所需ACK数的最长持续时间。默认值为10秒。

channel_buffer_size

每个Kafka代理在输出管道中缓冲的消息数量。默认值为256。

keep_alive

活动网络连接的保持活动时间。如果为0,则禁用保持活动时间。默认值为0秒。

compression

设置输出压缩编解码器。必须是 none, snappy, lz4 and gzip. The default is gzip.

Kafka的Azure事件中心的已知问题

针对Kafka的Azure事件中心时,请将compression设置为none,因为不支持提供的编解码器。

compression_level

设置gzip使用的压缩级别。将此值设置为0将禁用压缩。压缩级别必须在1(最佳速度)到9(最佳压缩)的范围内。

提高压缩级别将减少网络使用量,但会增加cpu使用量。

默认值为4。

max_message_bytes

JSON编码消息的最大允许大小。较大的消息将被删除。默认值为1000000(字节)。该值应等于或小于代理的message.max.bytes.

required_acks

代理要求的确认字符可靠性级别。0=无响应,1=等待本地提交,-1=等待所有副本提交。默认值为1。

注意:如果设置为0,则Kafka不返回ACK。错误时消息可能会静默丢失。

ssl

SSL参数的配置选项,如Kafka连接的根CA。Kafka主机密钥库应使用-keyalg RSA参数创建,以确保它使用FileBeat的Kafka库支持的密码。有关详细信息,请参阅SSL

kerberos

此功能处于测试阶段,可能会发生变化。设计和代码不如官方GA功能成熟,并且按原样提供,没有保证。测试功能不受官方GA功能支持SLA的约束。

Kerberos身份验证的配置选项。

有关详细信息,请参阅 Kerberos


Kafka.md
http://blog.lujinkai.cn/运维/ELK/filebeats/配置/output/Kafka/
作者
像方便面一样的男子
发布于
2023年12月5日
许可协议