- Fastify API on :3000 (tasks, groups, auth, connectors) - PostgreSQL schema: users, tasks, task_groups, goals, connectors - 8 default task groups - JWT auth (register/login/me) - API Bridge framework with webhooks - CLAUDE.md + polling scripts - Collab worker integration Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
138 lines
4.3 KiB
Python
138 lines
4.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Task Team Collab Worker — HTTP-based agent that polls mngmt for tasks,
|
|
executes them with Claude CLI, reports results back.
|
|
"""
|
|
import os
|
|
import sys
|
|
import time
|
|
import json
|
|
import subprocess
|
|
import logging
|
|
import signal
|
|
from datetime import datetime
|
|
|
|
import requests
|
|
|
|
# Config
|
|
MNGMT_URL = os.environ.get("COLLAB_URL", "https://mngmt.it-enterprise.pro")
|
|
TOKEN = os.environ.get("COLLAB_TOKEN", "")
|
|
HOSTNAME = os.environ.get("HOSTNAME", "unknown")
|
|
CLAUDE_BIN = os.environ.get("CLAUDE_BIN", "/usr/bin/claude")
|
|
WORK_DIR = os.environ.get("WORK_DIR", "/opt/task-team")
|
|
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "30"))
|
|
|
|
API = f"{MNGMT_URL}/api/v2/collab"
|
|
HEADERS = {"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"}
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
|
|
log = logging.getLogger("collab-worker")
|
|
|
|
running = True
|
|
def stop(sig, frame):
|
|
global running
|
|
running = False
|
|
signal.signal(signal.SIGTERM, stop)
|
|
signal.signal(signal.SIGINT, stop)
|
|
|
|
|
|
def heartbeat():
|
|
try:
|
|
r = requests.post(f"{API}/heartbeat", headers=HEADERS,
|
|
json={"node": HOSTNAME, "server": "taskteam"}, timeout=10)
|
|
return r.status_code == 200
|
|
except Exception as e:
|
|
log.warning(f"Heartbeat failed: {e}")
|
|
return False
|
|
|
|
|
|
def get_tasks():
|
|
try:
|
|
r = requests.get(f"{API}/tasks", headers=HEADERS, timeout=10)
|
|
if r.status_code == 200:
|
|
data = r.json()
|
|
return [t for t in data.get("data", []) if t.get("status") == "open" and
|
|
(t.get("assignee") == HOSTNAME or t.get("assignee") is None)]
|
|
return []
|
|
except Exception as e:
|
|
log.warning(f"Get tasks failed: {e}")
|
|
return []
|
|
|
|
|
|
def claim_task(task_id):
|
|
try:
|
|
r = requests.post(f"{API}/tasks/{task_id}/claim", headers=HEADERS, timeout=10)
|
|
return r.status_code == 200
|
|
except:
|
|
return False
|
|
|
|
|
|
def submit_result(task_id, result, status="completed"):
|
|
try:
|
|
r = requests.post(f"{API}/tasks/{task_id}/submit", headers=HEADERS,
|
|
json={"result": result, "status": status}, timeout=30)
|
|
return r.status_code == 200
|
|
except Exception as e:
|
|
log.warning(f"Submit failed: {e}")
|
|
return False
|
|
|
|
|
|
def execute_claude(prompt, work_dir=WORK_DIR, max_turns=10):
|
|
"""Execute Claude CLI and return output."""
|
|
cmd = [CLAUDE_BIN, "--dangerously-skip-permissions",
|
|
"-p", prompt, "--max-turns", str(max_turns)]
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True,
|
|
timeout=600, cwd=work_dir, errors='replace')
|
|
output = result.stdout[-10000:] if len(result.stdout) > 10000 else result.stdout
|
|
if result.returncode != 0 and result.stderr:
|
|
output += f"\n[STDERR]: {result.stderr[-2000:]}"
|
|
return output, result.returncode == 0
|
|
except subprocess.TimeoutExpired:
|
|
return "[ERROR] Claude CLI timed out after 600s", False
|
|
except Exception as e:
|
|
return f"[ERROR] {str(e)}", False
|
|
|
|
|
|
def process_task(task):
|
|
task_id = task["id"]
|
|
title = task.get("title", "")
|
|
description = task.get("description", "")
|
|
log.info(f"Processing task #{task_id}: {title}")
|
|
|
|
if not claim_task(task_id):
|
|
log.warning(f"Failed to claim task #{task_id}")
|
|
return
|
|
|
|
prompt = f"TASK: {title}\n\nDESCRIPTION:\n{description}\n\nExecute this task and report results."
|
|
output, success = execute_claude(prompt)
|
|
|
|
ts = datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC")
|
|
feedback = f"⏰ {ts}\n💻 {HOSTNAME}\n{'🟢' if success else '🔴'} {'Done' if success else 'Failed'}\n\n{output}"
|
|
|
|
submit_result(task_id, feedback, "completed" if success else "failed")
|
|
log.info(f"Task #{task_id} {'completed' if success else 'failed'}")
|
|
|
|
|
|
def main():
|
|
log.info(f"Collab Worker started: {HOSTNAME}, polling every {POLL_INTERVAL}s")
|
|
if not TOKEN:
|
|
log.error("COLLAB_TOKEN not set!")
|
|
sys.exit(1)
|
|
|
|
while running:
|
|
heartbeat()
|
|
tasks = get_tasks()
|
|
for task in tasks:
|
|
if not running:
|
|
break
|
|
process_task(task)
|
|
for _ in range(POLL_INTERVAL):
|
|
if not running:
|
|
break
|
|
time.sleep(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|