Files
task-team/collab-worker.py
Claude CLI Agent 9d075455e3 Initial: Fastify API + DB schema + collab setup
- Fastify API on :3000 (tasks, groups, auth, connectors)
- PostgreSQL schema: users, tasks, task_groups, goals, connectors
- 8 default task groups
- JWT auth (register/login/me)
- API Bridge framework with webhooks
- CLAUDE.md + polling scripts
- Collab worker integration

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 09:57:56 +00:00

138 lines
4.3 KiB
Python

#!/usr/bin/env python3
"""
Task Team Collab Worker — HTTP-based agent that polls mngmt for tasks,
executes them with Claude CLI, reports results back.
"""
import os
import sys
import time
import json
import subprocess
import logging
import signal
from datetime import datetime
import requests
# Config
MNGMT_URL = os.environ.get("COLLAB_URL", "https://mngmt.it-enterprise.pro")
TOKEN = os.environ.get("COLLAB_TOKEN", "")
HOSTNAME = os.environ.get("HOSTNAME", "unknown")
CLAUDE_BIN = os.environ.get("CLAUDE_BIN", "/usr/bin/claude")
WORK_DIR = os.environ.get("WORK_DIR", "/opt/task-team")
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "30"))
API = f"{MNGMT_URL}/api/v2/collab"
HEADERS = {"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"}
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
log = logging.getLogger("collab-worker")
running = True
def stop(sig, frame):
global running
running = False
signal.signal(signal.SIGTERM, stop)
signal.signal(signal.SIGINT, stop)
def heartbeat():
try:
r = requests.post(f"{API}/heartbeat", headers=HEADERS,
json={"node": HOSTNAME, "server": "taskteam"}, timeout=10)
return r.status_code == 200
except Exception as e:
log.warning(f"Heartbeat failed: {e}")
return False
def get_tasks():
try:
r = requests.get(f"{API}/tasks", headers=HEADERS, timeout=10)
if r.status_code == 200:
data = r.json()
return [t for t in data.get("data", []) if t.get("status") == "open" and
(t.get("assignee") == HOSTNAME or t.get("assignee") is None)]
return []
except Exception as e:
log.warning(f"Get tasks failed: {e}")
return []
def claim_task(task_id):
try:
r = requests.post(f"{API}/tasks/{task_id}/claim", headers=HEADERS, timeout=10)
return r.status_code == 200
except:
return False
def submit_result(task_id, result, status="completed"):
try:
r = requests.post(f"{API}/tasks/{task_id}/submit", headers=HEADERS,
json={"result": result, "status": status}, timeout=30)
return r.status_code == 200
except Exception as e:
log.warning(f"Submit failed: {e}")
return False
def execute_claude(prompt, work_dir=WORK_DIR, max_turns=10):
"""Execute Claude CLI and return output."""
cmd = [CLAUDE_BIN, "--dangerously-skip-permissions",
"-p", prompt, "--max-turns", str(max_turns)]
try:
result = subprocess.run(cmd, capture_output=True, text=True,
timeout=600, cwd=work_dir, errors='replace')
output = result.stdout[-10000:] if len(result.stdout) > 10000 else result.stdout
if result.returncode != 0 and result.stderr:
output += f"\n[STDERR]: {result.stderr[-2000:]}"
return output, result.returncode == 0
except subprocess.TimeoutExpired:
return "[ERROR] Claude CLI timed out after 600s", False
except Exception as e:
return f"[ERROR] {str(e)}", False
def process_task(task):
task_id = task["id"]
title = task.get("title", "")
description = task.get("description", "")
log.info(f"Processing task #{task_id}: {title}")
if not claim_task(task_id):
log.warning(f"Failed to claim task #{task_id}")
return
prompt = f"TASK: {title}\n\nDESCRIPTION:\n{description}\n\nExecute this task and report results."
output, success = execute_claude(prompt)
ts = datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC")
feedback = f"{ts}\n💻 {HOSTNAME}\n{'🟢' if success else '🔴'} {'Done' if success else 'Failed'}\n\n{output}"
submit_result(task_id, feedback, "completed" if success else "failed")
log.info(f"Task #{task_id} {'completed' if success else 'failed'}")
def main():
log.info(f"Collab Worker started: {HOSTNAME}, polling every {POLL_INTERVAL}s")
if not TOKEN:
log.error("COLLAB_TOKEN not set!")
sys.exit(1)
while running:
heartbeat()
tasks = get_tasks()
for task in tasks:
if not running:
break
process_task(task)
for _ in range(POLL_INTERVAL):
if not running:
break
time.sleep(1)
if __name__ == "__main__":
main()