跳转到主要内容
Arcentra 同时提供三种互补的接口:
  • gRPC:控制平面与 Agent 之间的主要契约,也是推荐的服务间集成方式。
  • HTTP/JSON:面向用户的接口(流水线、运行、身份、设置等),由 UI 与外部 自动化使用。
  • WebSocket:日志与状态的推送通道。

gRPC 服务

Proto 定义位于 api/ 目录,使用 Buf 管理。共五个服务模块(路径形如 */v1):
服务模块职责
Agentagent/v1Agent 生命周期、StepRun 派发、控制通道
Gatewaygateway/v1数据面日志与事件接入
Pipelinepipeline/v1Pipeline 增删改查、触发与运行管理
StepRunsteprun/v1StepRun 增删改查、重试、产物列表
Streamstream/v1状态与事件的服务端流

Agent 服务要点

  • HeartbeatRegisterUnregister:Agent 生命周期。
  • FetchStepRun:Agent 主动拉取与自身标签匹配的 StepRun。
  • ReportStepRunStatusCancelStepRun:双向状态流。
  • UpdateLabels:在线动态更新标签,无需重启。
  • Connect:Agent 与 Gateway 之间的双向控制通道。

Gateway 服务要点

  • PushLogs:高吞吐、批量、容忍丢失的日志流。
  • PushEvents:可靠、幂等、可重试且支持部分接受的事件流。

Pipeline 服务要点

CRUD 加运行控制:CreatePipelineUpdatePipelineGetPipelineListPipelinesDeletePipelineTriggerPipelineStopPipelineGetPipelineRunListPipelineRuns。触发方式覆盖 manual、cron/调度、 event/Webhook。

StepRun 服务要点

CRUD 加执行控制:CreateStepRunGetStepRunListStepRunsUpdateStepRunDeleteStepRunCancelStepRunRetryStepRunListStepRunArtifacts。Step 支持插件动作、重试策略、产物收集、按标签路由 以及 when 条件表达式。

Stream 服务要点

  • StreamStepRunStatusStreamJobStatusStreamPipelineStatus:状态实时 推送。
  • AgentChannel:Agent 与 Server 双向通道。
  • StreamAgentStatusStreamEvents:Agent 在线情况与系统事件。
事件类型遵循 arcentra.<对象>.<生命周期> 模式,例如 arcentra.step.startedarcentra.pipeline.failed

生成 gRPC 客户端

使用 Buf 生成代码:
make buf
# 或者直接在 api 目录下使用 buf
cd api && buf generate

HTTP API

HTTP API 由控制平面提供,默认监听 :8080。所有接口需要 Bearer Token:
Authorization: Bearer <token>
响应使用统一封装:成功包含 codemsg、可选 detailtimestamp; 错误使用 errMsgpath 替代 detail /api/v1/pipelines/... 是最常用的 HTTP 端点,详细请求/响应字段见 流水线 其他常见的 HTTP 模块:
  • 身份与设置:用户、角色、租户管理。
  • 项目与上传:项目增删改查与资源上传。
  • Agent:Agent 列表与标签查询。
完整路由请以 internal/control 中的注册情况为准。

WebSocket 网关

WebSocket 入口为 GET /api/v1/ws(WebSocket 握手使用 HTTP GET)。

请求格式

{
  "channel": "channel_log | channel_status",
  "action": "subscribe | unsubscribe",
  "params": {
    "pipelineId": "string",
    "jobId": "string",
    "stepRunId": "string"
  }
}
  • channel 必填:channel_log / channel_status
  • action 默认 subscribe
  • params 必填,pipelineIdjobIdstepRunId 都需要传入。

响应格式

{
  "channel": "channel_log | channel_status",
  "type": "string",
  "params": { "pipelineId": "...", "jobId": "...", "stepRunId": "..." },
  "data": {},
  "error": "string",
  "message": "string"
}

日志通道(channel_log

typedata
subscribednull
log_chunkLog[],每批最多 200 条
history_donenull
log单条 Log
unsubscribednull
errornull(错误信息在 error 字段)
Log 字段:
字段类型必填说明
step_run_idstringStepRun 唯一标识
timestampint64Unix 时间戳(秒)
line_numberint32行号,从 1 开始
levelstringinfowarnerror
contentstring日志内容
streamstringstdout / stderr
plugin_namestring插件名
agent_idstring上报的 Agent

状态通道(channel_status

  • status_snapshot:来自数据库(t_step_run)的初始快照。
  • status:StepRun 仍在运行(状态 1/2/3)时的 Kafka 实时事件。
  • unsubscribed:取消订阅响应。
  • error:错误。
StepRun 已结束时只返回快照,不再推送 Kafka 事件。

快照示例

{
  "stepRunId": "...",
  "pipelineId": "...",
  "jobId": "...",
  "status": 3,
  "startTime": "...",
  "endTime": null,
  "duration": 1234
}

实时事件示例

{
  "pipelineId": "...",
  "jobId": "...",
  "stepRunId": "...",
  "eventType": "arcentra.step.started",
  "status": "started",
  "subject": "pipeline:xxx:job:yyy:step:zzz",
  "raw": { "type": "...", "data": {}, "subject": "..." }
}

错误示例

{
  "channel": "channel_log",
  "type": "error",
  "params": { "pipelineId": "...", "jobId": "...", "stepRunId": "..." },
  "error": "pipelineId, jobId and stepRunId are required"
}
Last modified on April 26, 2026