Arcentra 同时提供三种互补的接口:
- gRPC:控制平面与 Agent 之间的主要契约,也是推荐的服务间集成方式。
- HTTP/JSON:面向用户的接口(流水线、运行、身份、设置等),由 UI 与外部
自动化使用。
- WebSocket:日志与状态的推送通道。
gRPC 服务
Proto 定义位于
api/ 目录,使用
Buf 管理。共五个服务模块(路径形如 */v1):
| 服务 | 模块 | 职责 |
|---|
| Agent | agent/v1 | Agent 生命周期、StepRun 派发、控制通道 |
| Gateway | gateway/v1 | 数据面日志与事件接入 |
| Pipeline | pipeline/v1 | Pipeline 增删改查、触发与运行管理 |
| StepRun | steprun/v1 | StepRun 增删改查、重试、产物列表 |
| Stream | stream/v1 | 状态与事件的服务端流 |
Agent 服务要点
Heartbeat、Register、Unregister:Agent 生命周期。
FetchStepRun:Agent 主动拉取与自身标签匹配的 StepRun。
ReportStepRunStatus、CancelStepRun:双向状态流。
UpdateLabels:在线动态更新标签,无需重启。
Connect:Agent 与 Gateway 之间的双向控制通道。
Gateway 服务要点
PushLogs:高吞吐、批量、容忍丢失的日志流。
PushEvents:可靠、幂等、可重试且支持部分接受的事件流。
Pipeline 服务要点
CRUD 加运行控制:CreatePipeline、UpdatePipeline、GetPipeline、
ListPipelines、DeletePipeline、TriggerPipeline、StopPipeline、
GetPipelineRun、ListPipelineRuns。触发方式覆盖 manual、cron/调度、
event/Webhook。
StepRun 服务要点
CRUD 加执行控制:CreateStepRun、GetStepRun、ListStepRuns、
UpdateStepRun、DeleteStepRun、CancelStepRun、RetryStepRun、
ListStepRunArtifacts。Step 支持插件动作、重试策略、产物收集、按标签路由
以及 when 条件表达式。
Stream 服务要点
StreamStepRunStatus、StreamJobStatus、StreamPipelineStatus:状态实时
推送。
AgentChannel:Agent 与 Server 双向通道。
StreamAgentStatus、StreamEvents:Agent 在线情况与系统事件。
事件类型遵循 arcentra.<对象>.<生命周期> 模式,例如
arcentra.step.started、arcentra.pipeline.failed。
生成 gRPC 客户端
使用 Buf 生成代码:
make buf
# 或者直接在 api 目录下使用 buf
cd api && buf generate
HTTP API
HTTP API 由控制平面提供,默认监听 :8080。所有接口需要 Bearer Token:
Authorization: Bearer <token>
响应使用统一封装:成功包含 code、msg、可选 detail 与 timestamp;
错误使用 errMsg 与 path 替代 detail。
/api/v1/pipelines/... 是最常用的 HTTP 端点,详细请求/响应字段见
流水线。
其他常见的 HTTP 模块:
- 身份与设置:用户、角色、租户管理。
- 项目与上传:项目增删改查与资源上传。
- Agent:Agent 列表与标签查询。
完整路由请以
internal/control
中的注册情况为准。
WebSocket 网关
WebSocket 入口为 GET /api/v1/ws(WebSocket 握手使用 HTTP GET)。
请求格式
{
"channel": "channel_log | channel_status",
"action": "subscribe | unsubscribe",
"params": {
"pipelineId": "string",
"jobId": "string",
"stepRunId": "string"
}
}
channel 必填:channel_log / channel_status。
action 默认 subscribe。
params 必填,pipelineId、jobId、stepRunId 都需要传入。
响应格式
{
"channel": "channel_log | channel_status",
"type": "string",
"params": { "pipelineId": "...", "jobId": "...", "stepRunId": "..." },
"data": {},
"error": "string",
"message": "string"
}
日志通道(channel_log)
type | data |
|---|
subscribed | null |
log_chunk | Log[],每批最多 200 条 |
history_done | null |
log | 单条 Log |
unsubscribed | null |
error | null(错误信息在 error 字段) |
Log 字段:
| 字段 | 类型 | 必填 | 说明 |
|---|
step_run_id | string | 是 | StepRun 唯一标识 |
timestamp | int64 | 是 | Unix 时间戳(秒) |
line_number | int32 | 是 | 行号,从 1 开始 |
level | string | 否 | info、warn、error 等 |
content | string | 否 | 日志内容 |
stream | string | 否 | stdout / stderr |
plugin_name | string | 否 | 插件名 |
agent_id | string | 否 | 上报的 Agent |
状态通道(channel_status)
status_snapshot:来自数据库(t_step_run)的初始快照。
status:StepRun 仍在运行(状态 1/2/3)时的 Kafka 实时事件。
unsubscribed:取消订阅响应。
error:错误。
StepRun 已结束时只返回快照,不再推送 Kafka 事件。
快照示例
{
"stepRunId": "...",
"pipelineId": "...",
"jobId": "...",
"status": 3,
"startTime": "...",
"endTime": null,
"duration": 1234
}
实时事件示例
{
"pipelineId": "...",
"jobId": "...",
"stepRunId": "...",
"eventType": "arcentra.step.started",
"status": "started",
"subject": "pipeline:xxx:job:yyy:step:zzz",
"raw": { "type": "...", "data": {}, "subject": "..." }
}
错误示例
{
"channel": "channel_log",
"type": "error",
"params": { "pipelineId": "...", "jobId": "...", "stepRunId": "..." },
"error": "pipelineId, jobId and stepRunId are required"
}
Last modified on April 26, 2026