Skip to content

ZubblClient

The main entry point for the Zubbl SDK.

Initialize

from zubbl import ZubblClient

zubbl = ZubblClient(
    api_key="zubbl_YOUR_KEY",
    base_url="https://api.zubbl.tech",  # default
    timeout=30,                          # request timeout in seconds
    max_retries=3,                       # retry on transient failures
    rl_mode="cloud",                     # "local" | "cloud" | "hybrid"
)

Core Methods

start_trajectory(task_description, metadata=None) → str

Start recording a new trajectory. Returns the trajectory ID.

tid = zubbl.start_trajectory(
    task_description="Summarize quarterly report",
    metadata={"source": "automated"},
)

record_step(action, tool_name=None, tool_input=None, tool_output=None, reasoning=None, tokens_used=0, latency_ms=0)

Record a step in the current trajectory.

zubbl.record_step(
    action="read_file",
    tool_name="doc_reader",
    tool_input={"file": "report.pdf"},
    tool_output="Revenue: $2.3M",
    tokens_used=150,
    latency_ms=800,
)

end_trajectory(status, actual_output=None, expected_output=None, agent_confidence=None) → Trajectory

End and auto-ingest the current trajectory. Status must be "success", "failure", "partial", or "timeout".

traj = zubbl.end_trajectory(status="success")

query(task_description, context=None) → Policy | None

Get policy recommendations before running a task.

policy = zubbl.query(task_description="code review")
if policy:
    print(policy.task_type)           # str
    print(policy.confidence_score)    # float
    print(policy.recommended_actions) # list
    print(policy.is_quarantined)      # bool

feedback(trajectory_id, rating, comment=None) → dict

Submit feedback on a trajectory. Rating is -1.0 to 1.0.

zubbl.feedback(
    trajectory_id="traj_abc123",
    rating=0.85,
    comment="Accurate and fast",
)

get_usage() → UsageStats

Get usage statistics.

usage = zubbl.get_usage()
print(usage.trajectories_ingested)  # int
print(usage.credits_remaining)      # int
print(usage.task_success_rate)      # float (0.0-1.0)
print(usage.queries_made)           # int
print(usage.rl_cycles_run)          # int

wrap(agent, auto_recover=None) → WrappedAgent

Wrap any callable agent for automatic trajectory recording and recovery.

smart_agent = zubbl.wrap(my_agent)
result = smart_agent.run("Your task")

get_failure_stats() → dict

Get failure statistics and recovery rates.

stats = zubbl.get_failure_stats()
print(stats["total_failures_seen"])
print(stats["total_auto_recovered"])
print(stats["recovery_rate"])

validate(trajectory_id, expected_output, actual_output=None, agent_confidence=0.0) → dict

Validate a trajectory's output against expected results.

result = zubbl.validate(
    trajectory_id="traj_abc123",
    expected_output="Summary of Q4 earnings",
    actual_output="Q4 revenue grew 15%",
    agent_confidence=0.9,
)

Cloud Features (Pro Plans)

find_similar_trajectories(query_text, top_k=5, status_filter=None) → list

Find similar past trajectories using vector search.

results = zubbl.find_similar_trajectories(
    query_text="fix authentication bug",
    top_k=5,
    status_filter="success",
)
for r in results:
    print(r.trajectory_id, r.score)

get_graph_recommendations(task_type, min_confidence=0.5, limit=5) → list

Get skill graph recommendations.

recs = zubbl.get_graph_recommendations(task_type="code_review")
for r in recs:
    print(r.pattern, r.confidence, r.description)

get_skill_gaps(agent_id, task_type) → list

Analyze skill gaps for an agent.

gaps = zubbl.get_skill_gaps(agent_id="my-agent", task_type="security_audit")
for g in gaps:
    print(g.skill, g.importance)

get_training_stats() → TrainingStats | None

Get training pipeline status.

stats = zubbl.get_training_stats()
if stats:
    print(stats.model_name)
    print(stats.training_runs)
    print(stats.trajectories_buffered)

trigger_training() → dict | None

Manually trigger a training run.

zubbl.trigger_training()

get_services_health() → ServiceHealth

Check which cloud services are available.

health = zubbl.get_services_health()
print(health.status)

get_streams_info() → dict | None

Get streaming queue status.

info = zubbl.get_streams_info()

Async Methods

All core methods have async variants:

# Async versions
await zubbl.ingest_async(trajectory)
policy = await zubbl.query_async(task_description="...")
results = await zubbl.find_similar_trajectories_async(query_text="...")
recs = await zubbl.get_graph_recommendations_async(task_type="...")