xk6-kafka是k6的扩展,允许用户使用生产者(及用于调试的消费者)对Apache Kafka进行负载测试。其核心用途是测试基于Apache Kafka设计的系统,通过自动生成消息并发送到Kafka来验证消费者及整个系统的性能和正确性。本Docker镜像为该扩展提供***容器化部署方式,简化测试环境配置。
适用于对Kafka生产者、消费者及依赖Kafka的系统进行负载测试的场景,尤其适合:
从Docker Hub拉取***镜像:
bashdocker pull mostafamoradian/xk6-kafka:latest
通过挂载本地脚本或标准输入传递测试脚本运行负载测试。
bashdocker run --rm -i mostafamoradian/xk6-kafka:latest run - <本地脚本路径/test_json.js
将本地脚本目录挂载到容器中,直接运行指定脚本:
bashdocker run --rm -v $(pwd)/scripts:/scripts mostafamoradian/xk6-kafka:latest run /scripts/test_json.js
测试脚本遵循k6测试生命周期,包含以下核心阶段:
javascript// 导入完整模块 import * as kafka from "k6/x/kafka"; // 或导入特定类和常量 import { Writer, Reader, Connection, SchemaRegistry, SCHEMA_TYPE_STRING } from "k6/x/kafka";
创建Kafka连接、生产者、消费者实例,并初始化主题:
javascript// 创建生产者 const writer = new Writer({ brokers: ["localhost:9092"], // Kafka broker地址列表 topic: "my-test-topic", // 目标主题 compression: "snappy" // 消息压缩方式(可选) }); // 创建消费者 const reader = new Reader({ brokers: ["localhost:9092"], topic: "my-test-topic", groupID: "test-group" // 消费者组ID(可选) }); // 创建连接(用于主题管理) const connection = new Connection({ address: "localhost:9092" }); // 初始化时创建主题(仅首次运行) if (__VU == 0) { connection.createTopic({ topic: "my-test-topic", numPartitions: 3, // 分区数(可选) replicationFactor: 1 // 副本因子(可选) }); }
发送消息、消费消息并验证:
javascriptexport default function () { // 生产消息 writer.produce({ messages: [ { key: schemaRegistry.serialize({ data: "key-1", schemaType: SCHEMA_TYPE_STRING }), value: schemaRegistry.serialize({ data: { id: 1, value: "test" }, schemaType: SCHEMA_TYPE_JSON }) } ] }); // 消费消息并验证 const messages = reader.consume({ limit: 10 }); // 最多消费10条消息 if (messages.length > 0) { console.log(`消费消息: ${JSON.stringify(messages[0].value)}`); // 可添加k6 checks验证消息内容、格式等 } }
关闭连接并清理测试主题:
javascriptexport function teardown() { // 删除测试主题(可选) connection.deleteTopic("my-test-topic"); // 关闭所有连接 writer.close(); reader.close(); connection.close(); }
javascriptconst writer = new Writer({ brokers: ["kafka-broker:9092"], topic: "sasl-test-topic", sasl: { mechanism: "SCRAM-SHA-256", // 认证机制 username: "test-user", password: "test-pass" }, tls: { enable: true // 启用TLS } });
javascriptconst schemaRegistry = new SchemaRegistry({ url: "[***]" }); writer.produce({ messages: [ { key: schemaRegistry.serialize({ data: "user-1", schemaType: "avro", schema: { type: "string" } // 或从Schema Registry获取 }), value: schemaRegistry.serialize({ data: { id: 1, name: "test" }, schemaType: "avro", schema: { type: "record", name: "User", fields: [{ name: "id", type: "int" }, { name: "name", type: "string" }] } }) } ] });
xk6-kafka导出以下核心指标用于监控测试过程:
| 指标名称 | 类型 | 描述 |
|---|---|---|
| kafka_reader_message_count | Counter | 消费消息总数 |
| kafka_writer_message_count | Counter | 生产消息总数 |
| kafka_reader_error_count | Counter | 消费错误总数 |
| kafka_writer_error_count | Counter | 生产错误总数 |
| kafka_reader_lag | Gauge | 消费滞后量(最后消息偏移与当前偏移差) |
| kafka_writer_write_seconds | Trend | 消息写入耗时(趋势) |
| kafka_reader_read_seconds | Trend | 消息读取耗时(趋势) |
| kafka_reader_fetch_bytes | Counter | 读取字节总数 |
| kafka_writer_batch_size | Counter | 批处理消息总数 |
***仓库scripts目录提供多种功能示例,关键脚本包括:
test_json.js:JSON格式消息生产/消费测试test_avro_with_schema_registry.js:使用Schema Registry的Avro消息测试test_sasl_auth.js:SASL认证测试test_consumer_group.js:消费者组多分区消费测试test_topics.js:主题创建/删除/列出操作测试来自真实用户的反馈,见证轩辕镜像的优质服务
免费版仅支持 Docker Hub 加速,不承诺可用性和速度;专业版支持更多镜像源,保证可用性和稳定速度,提供优先客服响应。
免费版仅支持 docker.io;专业版支持 docker.io、gcr.io、ghcr.io、registry.k8s.io、nvcr.io、quay.io、mcr.microsoft.com、docker.elastic.co 等。
当返回 402 Payment Required 错误时,表示流量已耗尽,需要充值流量包以恢复服务。
通常由 Docker 版本过低导致,需要升级到 20.x 或更高版本以支持 V2 协议。
先检查 Docker 版本,版本过低则升级;版本正常则验证镜像信息是否正确。
使用 docker tag 命令为镜像打上新标签,去掉域名前缀,使镜像名称更简洁。
探索更多轩辕镜像的使用方法,找到最适合您系统的配置方式
通过 Docker 登录认证访问私有仓库
在 Linux 系统配置镜像加速服务
在 Docker Desktop 配置镜像加速
Docker Compose 项目配置加速
Kubernetes 集群配置 Containerd
在宝塔面板一键配置镜像加速
Synology 群晖 NAS 配置加速
飞牛 fnOS 系统配置镜像加速
极空间 NAS 系统配置加速服务
爱快 iKuai 路由系统配置加速
绿联 NAS 系统配置镜像加速
QNAP 威联通 NAS 配置加速
Podman 容器引擎配置加速
HPC 科学计算容器配置加速
ghcr、Quay、nvcr 等镜像仓库
无需登录使用专属域名加速
需要其他帮助?请查看我们的 常见问题 或 官方QQ群: 13763429