#!/usr/bin/env python3 """ Task Team Collab Worker — HTTP-based agent that polls mngmt for tasks, executes them with Claude CLI, reports results back. """ import os import sys import time import json import subprocess import logging import signal from datetime import datetime import requests # Config MNGMT_URL = os.environ.get("COLLAB_URL", "https://mngmt.it-enterprise.pro") TOKEN = os.environ.get("COLLAB_TOKEN", "") HOSTNAME = os.environ.get("HOSTNAME", "unknown") CLAUDE_BIN = os.environ.get("CLAUDE_BIN", "/usr/bin/claude") WORK_DIR = os.environ.get("WORK_DIR", "/opt/task-team") POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "30")) API = f"{MNGMT_URL}/api/v2/collab" HEADERS = {"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"} logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") log = logging.getLogger("collab-worker") running = True def stop(sig, frame): global running running = False signal.signal(signal.SIGTERM, stop) signal.signal(signal.SIGINT, stop) def heartbeat(): try: r = requests.post(f"{API}/heartbeat", headers=HEADERS, json={"node": HOSTNAME, "server": "taskteam"}, timeout=10) return r.status_code == 200 except Exception as e: log.warning(f"Heartbeat failed: {e}") return False def get_tasks(): try: r = requests.get(f"{API}/tasks", headers=HEADERS, timeout=10) if r.status_code == 200: data = r.json() return [t for t in data.get("data", []) if t.get("status") == "open" and (t.get("assignee") == HOSTNAME or t.get("assignee") is None)] return [] except Exception as e: log.warning(f"Get tasks failed: {e}") return [] def claim_task(task_id): try: r = requests.post(f"{API}/tasks/{task_id}/claim", headers=HEADERS, timeout=10) return r.status_code == 200 except: return False def submit_result(task_id, result, status="completed"): try: r = requests.post(f"{API}/tasks/{task_id}/submit", headers=HEADERS, json={"result": result, "status": status}, timeout=30) return r.status_code == 200 except Exception as e: log.warning(f"Submit failed: {e}") return False def execute_claude(prompt, work_dir=WORK_DIR, max_turns=10): """Execute Claude CLI and return output.""" cmd = [CLAUDE_BIN, "--dangerously-skip-permissions", "-p", prompt, "--max-turns", str(max_turns)] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=600, cwd=work_dir, errors='replace') output = result.stdout[-10000:] if len(result.stdout) > 10000 else result.stdout if result.returncode != 0 and result.stderr: output += f"\n[STDERR]: {result.stderr[-2000:]}" return output, result.returncode == 0 except subprocess.TimeoutExpired: return "[ERROR] Claude CLI timed out after 600s", False except Exception as e: return f"[ERROR] {str(e)}", False def process_task(task): task_id = task["id"] title = task.get("title", "") description = task.get("description", "") log.info(f"Processing task #{task_id}: {title}") if not claim_task(task_id): log.warning(f"Failed to claim task #{task_id}") return prompt = f"TASK: {title}\n\nDESCRIPTION:\n{description}\n\nExecute this task and report results." output, success = execute_claude(prompt) ts = datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC") feedback = f"ā° {ts}\nšŸ’» {HOSTNAME}\n{'🟢' if success else 'šŸ”“'} {'Done' if success else 'Failed'}\n\n{output}" submit_result(task_id, feedback, "completed" if success else "failed") log.info(f"Task #{task_id} {'completed' if success else 'failed'}") def main(): log.info(f"Collab Worker started: {HOSTNAME}, polling every {POLL_INTERVAL}s") if not TOKEN: log.error("COLLAB_TOKEN not set!") sys.exit(1) while running: heartbeat() tasks = get_tasks() for task in tasks: if not running: break process_task(task) for _ in range(POLL_INTERVAL): if not running: break time.sleep(1) if __name__ == "__main__": main()