Apache Beam 是一个统一的数据处理模型,用于定义批处理和流处理数据并行处理管道,同时提供一套特定语言的 SDK 用于构建管道,以及在分布式处理后端执行管道的运行器(Runner)。支持的后端包括 Apache Apex、Apache Flink、Apache Spark、Google Cloud Dataflow 和 Hazelcast Jet 等。其核心目标是简化跨平台、跨场景的数据处理逻辑开发,实现"一次编写,多处运行"的能力。
基于 Google Dataflow 模型演进而来,统一批处理和流处理的编程范式,支持无界数据流(流处理)和有界数据流(批处理)的统一表达,简化复杂数据处理逻辑的开发。
提供多种编程语言的 SDK,满足不同技术栈需求:
支持在多种分布式处理引擎上执行管道,无需修改业务逻辑:
核心概念包括:
场景:使用现有 SDK 编写数据处理管道,并在指定运行器上执行。
适用范围:数据工程师、数据分析师需快速开发批流处理任务,无需关注底层执行引擎细节。例如:日志数据实时清洗、用户行为分析、ETL 流程构建等。
场景:为特定用户群体开发 Beam SDK(如 Scala、R、图形化界面等)。
适用范围:语言专家或框架开发者,需扩展 Beam 的语言生态,满足特定技术栈需求。
场景:为分布式处理环境开发 PipelineRunner,支持基于 Beam 模型编写的程序。
适用范围:分布式计算框架开发者,需将 Beam 生态集成至自有执行引擎,扩展框架的数据处理能力。
Apache Beam ***未提供统一的 Docker 镜像,建议基于 SDK 语言构建自定义镜像。以下为 Java SDK 示例:
Dockerfile(Java SDK 示例)
dockerfileFROM maven:3.8.5-openjdk-11 AS builder WORKDIR /app COPY pom.xml . # 缓存依赖 RUN mvn dependency:go-offline COPY src ./src # 构建 Beam 管道应用 RUN mvn package -DskipTests FROM openjdk:11-jre-slim WORKDIR /app COPY --from=builder /app/target/*.jar app.jar # 运行 DirectRunner(本地调试) ENTRYPOINT ["java", "-jar", "app.jar", "--runner=DirectRunner"]
构建并运行镜像
bash# 构建镜像 docker build -t beam-java-app:latest . # 运行(使用 DirectRunner 本地执行) docker run --rm beam-java-app:latest
若需提交至 Flink 集群执行,需在运行时指定 Flink 集群地址:
bashdocker run --rm \ -e FLINK_MASTER=flink-jobmanager:8081 \ beam-java-app:latest \ --runner=FlinkRunner \ --flink-master=${FLINK_MASTER} \ --streaming=true # 若为流处理任务
yamlversion: "3.8" services: flink-jobmanager: image: flink:1.17-scala_2.12 ports: - "8081:8081" command: jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=flink-jobmanager flink-taskmanager: image: flink:1.17-scala_2.12 depends_on: - flink-jobmanager command: taskmanager environment: - JOB_MANAGER_RPC_ADDRESS=flink-jobmanager beam-app: build: . depends_on: - flink-jobmanager environment: - FLINK_MASTER=flink-jobmanager:8081 command: > java -jar app.jar --runner=FlinkRunner --flink-master=${FLINK_MASTER} --jobName=beam-flink-demo
| 参数名 | 说明 | 示例值 |
|---|---|---|
--runner | 指定运行器类型 | DirectRunner/FlinkRunner |
--jobName | 任务名称 | beam-wordcount-demo |
--inputFile | 输入文件路径(批处理) | /data/input.txt |
--output | 输出路径 | /data/output |
FlinkRunner
| 参数名 | 说明 | 示例值 |
|---|---|---|
--flink-master | Flink 集群 JobManager 地址 | flink-jobmanager:8081 |
--streaming | 是否启用流处理模式 | true/false |
--parallelism | 任务并行度 | 4 |
SparkRunner
| 参数名 | 说明 | 示例值 |
|---|---|---|
--spark-master | Spark 集群 Master 地址 | spark://spark-master:7077 |
--spark-submit | Spark 提交命令路径 | /opt/spark/bin/spark-submit |
DataflowRunner
| 参数名 | 说明 | 示例值 |
|---|---|---|
--project | GCP 项目 ID | my-gcp-project |
--region | 区域 | us-central1 |
--tempLocation | GCS 临时文件路径 | gs://my-bucket/temp |
| 环境变量名 | 说明 | 示例值 |
|---|---|---|
BEAM_HOME | Beam 安装路径(可选) | /opt/apache-beam |
FLINK_CONF_DIR | Flink 配置文件目录 | /etc/flink |
SPARK_HOME | Spark 安装路径 | /opt/spark |
javaimport org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.FlatMapElements; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TypeDescriptors; import java.util.Arrays; public class WordCount { public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); Pipeline p = Pipeline.create(options); p.apply(TextIO.read().from(options.as(WordCountOptions.class).getInputFile())) .apply(FlatMapElements.into(TypeDescriptors.strings()) .via(line -> Arrays.asList(line.split(" ")))) .apply(Count.perElement()) .apply(MapElements.into(TypeDescriptors.strings()) .via(wordCount -> wordCount.getKey() + ": " + wordCount.getValue())) .apply(TextIO.write().to(options.as(WordCountOptions.class).getOutput())); p.run().waitUntilFinish(); } public interface WordCountOptions extends PipelineOptions { String getInputFile(); void setInputFile(String value); String getOutput(); void setOutput(String value); } }
bashdocker run --rm \ -v $(pwd)/input.txt:/data/input.txt \ -v $(pwd)/output:/data/output \ beam-java-app:latest \ --runner=DirectRunner \ --inputFile=/data/input.txt \ --output=/data/output/result
来自真实用户的反馈,见证轩辕镜像的优质服务
免费版仅支持 Docker Hub 加速,不承诺可用性和速度;专业版支持更多镜像源,保证可用性和稳定速度,提供优先客服响应。
免费版仅支持 docker.io;专业版支持 docker.io、gcr.io、ghcr.io、registry.k8s.io、nvcr.io、quay.io、mcr.microsoft.com、docker.elastic.co 等。
当返回 402 Payment Required 错误时,表示流量已耗尽,需要充值流量包以恢复服务。
通常由 Docker 版本过低导致,需要升级到 20.x 或更高版本以支持 V2 协议。
先检查 Docker 版本,版本过低则升级;版本正常则验证镜像信息是否正确。
使用 docker tag 命令为镜像打上新标签,去掉域名前缀,使镜像名称更简洁。
探索更多轩辕镜像的使用方法,找到最适合您系统的配置方式
通过 Docker 登录认证访问私有仓库
在 Linux 系统配置镜像加速服务
在 Docker Desktop 配置镜像加速
Docker Compose 项目配置加速
Kubernetes 集群配置 Containerd
在宝塔面板一键配置镜像加速
Synology 群晖 NAS 配置加速
飞牛 fnOS 系统配置镜像加速
极空间 NAS 系统配置加速服务
爱快 iKuai 路由系统配置加速
绿联 NAS 系统配置镜像加速
QNAP 威联通 NAS 配置加速
Podman 容器引擎配置加速
HPC 科学计算容器配置加速
ghcr、Quay、nvcr 等镜像仓库
无需登录使用专属域名加速
需要其他帮助?请查看我们的 常见问题 或 官方QQ群: 13763429