ZubblClient¶
The main entry point for the Zubbl SDK.
Initialize¶
from zubbl import ZubblClient
zubbl = ZubblClient(
api_key="zubbl_YOUR_KEY",
base_url="https://api.zubbl.tech", # default
timeout=30, # request timeout in seconds
max_retries=3, # retry on transient failures
rl_mode="cloud", # "local" | "cloud" | "hybrid"
)
Core Methods¶
start_trajectory(task_description, metadata=None) → str¶
Start recording a new trajectory. Returns the trajectory ID.
tid = zubbl.start_trajectory(
task_description="Summarize quarterly report",
metadata={"source": "automated"},
)
record_step(action, tool_name=None, tool_input=None, tool_output=None, reasoning=None, tokens_used=0, latency_ms=0)¶
Record a step in the current trajectory.
zubbl.record_step(
action="read_file",
tool_name="doc_reader",
tool_input={"file": "report.pdf"},
tool_output="Revenue: $2.3M",
tokens_used=150,
latency_ms=800,
)
end_trajectory(status, actual_output=None, expected_output=None, agent_confidence=None) → Trajectory¶
End and auto-ingest the current trajectory. Status must be "success", "failure", "partial", or "timeout".
query(task_description, context=None) → Policy | None¶
Get policy recommendations before running a task.
policy = zubbl.query(task_description="code review")
if policy:
print(policy.task_type) # str
print(policy.confidence_score) # float
print(policy.recommended_actions) # list
print(policy.is_quarantined) # bool
feedback(trajectory_id, rating, comment=None) → dict¶
Submit feedback on a trajectory. Rating is -1.0 to 1.0.
get_usage() → UsageStats¶
Get usage statistics.
usage = zubbl.get_usage()
print(usage.trajectories_ingested) # int
print(usage.credits_remaining) # int
print(usage.task_success_rate) # float (0.0-1.0)
print(usage.queries_made) # int
print(usage.rl_cycles_run) # int
wrap(agent, auto_recover=None) → WrappedAgent¶
Wrap any callable agent for automatic trajectory recording and recovery.
get_failure_stats() → dict¶
Get failure statistics and recovery rates.
stats = zubbl.get_failure_stats()
print(stats["total_failures_seen"])
print(stats["total_auto_recovered"])
print(stats["recovery_rate"])
validate(trajectory_id, expected_output, actual_output=None, agent_confidence=0.0) → dict¶
Validate a trajectory's output against expected results.
result = zubbl.validate(
trajectory_id="traj_abc123",
expected_output="Summary of Q4 earnings",
actual_output="Q4 revenue grew 15%",
agent_confidence=0.9,
)
Cloud Features (Pro Plans)¶
find_similar_trajectories(query_text, top_k=5, status_filter=None) → list¶
Find similar past trajectories using vector search.
results = zubbl.find_similar_trajectories(
query_text="fix authentication bug",
top_k=5,
status_filter="success",
)
for r in results:
print(r.trajectory_id, r.score)
get_graph_recommendations(task_type, min_confidence=0.5, limit=5) → list¶
Get skill graph recommendations.
recs = zubbl.get_graph_recommendations(task_type="code_review")
for r in recs:
print(r.pattern, r.confidence, r.description)
get_skill_gaps(agent_id, task_type) → list¶
Analyze skill gaps for an agent.
gaps = zubbl.get_skill_gaps(agent_id="my-agent", task_type="security_audit")
for g in gaps:
print(g.skill, g.importance)
get_training_stats() → TrainingStats | None¶
Get training pipeline status.
stats = zubbl.get_training_stats()
if stats:
print(stats.model_name)
print(stats.training_runs)
print(stats.trajectories_buffered)
trigger_training() → dict | None¶
Manually trigger a training run.
get_services_health() → ServiceHealth¶
Check which cloud services are available.
get_streams_info() → dict | None¶
Get streaming queue status.
Async Methods¶
All core methods have async variants: