Initial: Fastify API + DB schema + collab setup
- Fastify API on :3000 (tasks, groups, auth, connectors) - PostgreSQL schema: users, tasks, task_groups, goals, connectors - 8 default task groups - JWT auth (register/login/me) - API Bridge framework with webhooks - CLAUDE.md + polling scripts - Collab worker integration Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
137
collab-worker.py
Normal file
137
collab-worker.py
Normal file
@@ -0,0 +1,137 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Task Team Collab Worker — HTTP-based agent that polls mngmt for tasks,
|
||||
executes them with Claude CLI, reports results back.
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import json
|
||||
import subprocess
|
||||
import logging
|
||||
import signal
|
||||
from datetime import datetime
|
||||
|
||||
import requests
|
||||
|
||||
# Config
|
||||
MNGMT_URL = os.environ.get("COLLAB_URL", "https://mngmt.it-enterprise.pro")
|
||||
TOKEN = os.environ.get("COLLAB_TOKEN", "")
|
||||
HOSTNAME = os.environ.get("HOSTNAME", "unknown")
|
||||
CLAUDE_BIN = os.environ.get("CLAUDE_BIN", "/usr/bin/claude")
|
||||
WORK_DIR = os.environ.get("WORK_DIR", "/opt/task-team")
|
||||
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "30"))
|
||||
|
||||
API = f"{MNGMT_URL}/api/v2/collab"
|
||||
HEADERS = {"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"}
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
|
||||
log = logging.getLogger("collab-worker")
|
||||
|
||||
running = True
|
||||
def stop(sig, frame):
|
||||
global running
|
||||
running = False
|
||||
signal.signal(signal.SIGTERM, stop)
|
||||
signal.signal(signal.SIGINT, stop)
|
||||
|
||||
|
||||
def heartbeat():
|
||||
try:
|
||||
r = requests.post(f"{API}/heartbeat", headers=HEADERS,
|
||||
json={"node": HOSTNAME, "server": "taskteam"}, timeout=10)
|
||||
return r.status_code == 200
|
||||
except Exception as e:
|
||||
log.warning(f"Heartbeat failed: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def get_tasks():
|
||||
try:
|
||||
r = requests.get(f"{API}/tasks", headers=HEADERS, timeout=10)
|
||||
if r.status_code == 200:
|
||||
data = r.json()
|
||||
return [t for t in data.get("data", []) if t.get("status") == "open" and
|
||||
(t.get("assignee") == HOSTNAME or t.get("assignee") is None)]
|
||||
return []
|
||||
except Exception as e:
|
||||
log.warning(f"Get tasks failed: {e}")
|
||||
return []
|
||||
|
||||
|
||||
def claim_task(task_id):
|
||||
try:
|
||||
r = requests.post(f"{API}/tasks/{task_id}/claim", headers=HEADERS, timeout=10)
|
||||
return r.status_code == 200
|
||||
except:
|
||||
return False
|
||||
|
||||
|
||||
def submit_result(task_id, result, status="completed"):
|
||||
try:
|
||||
r = requests.post(f"{API}/tasks/{task_id}/submit", headers=HEADERS,
|
||||
json={"result": result, "status": status}, timeout=30)
|
||||
return r.status_code == 200
|
||||
except Exception as e:
|
||||
log.warning(f"Submit failed: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def execute_claude(prompt, work_dir=WORK_DIR, max_turns=10):
|
||||
"""Execute Claude CLI and return output."""
|
||||
cmd = [CLAUDE_BIN, "--dangerously-skip-permissions",
|
||||
"-p", prompt, "--max-turns", str(max_turns)]
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True,
|
||||
timeout=600, cwd=work_dir, errors='replace')
|
||||
output = result.stdout[-10000:] if len(result.stdout) > 10000 else result.stdout
|
||||
if result.returncode != 0 and result.stderr:
|
||||
output += f"\n[STDERR]: {result.stderr[-2000:]}"
|
||||
return output, result.returncode == 0
|
||||
except subprocess.TimeoutExpired:
|
||||
return "[ERROR] Claude CLI timed out after 600s", False
|
||||
except Exception as e:
|
||||
return f"[ERROR] {str(e)}", False
|
||||
|
||||
|
||||
def process_task(task):
|
||||
task_id = task["id"]
|
||||
title = task.get("title", "")
|
||||
description = task.get("description", "")
|
||||
log.info(f"Processing task #{task_id}: {title}")
|
||||
|
||||
if not claim_task(task_id):
|
||||
log.warning(f"Failed to claim task #{task_id}")
|
||||
return
|
||||
|
||||
prompt = f"TASK: {title}\n\nDESCRIPTION:\n{description}\n\nExecute this task and report results."
|
||||
output, success = execute_claude(prompt)
|
||||
|
||||
ts = datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC")
|
||||
feedback = f"⏰ {ts}\n💻 {HOSTNAME}\n{'🟢' if success else '🔴'} {'Done' if success else 'Failed'}\n\n{output}"
|
||||
|
||||
submit_result(task_id, feedback, "completed" if success else "failed")
|
||||
log.info(f"Task #{task_id} {'completed' if success else 'failed'}")
|
||||
|
||||
|
||||
def main():
|
||||
log.info(f"Collab Worker started: {HOSTNAME}, polling every {POLL_INTERVAL}s")
|
||||
if not TOKEN:
|
||||
log.error("COLLAB_TOKEN not set!")
|
||||
sys.exit(1)
|
||||
|
||||
while running:
|
||||
heartbeat()
|
||||
tasks = get_tasks()
|
||||
for task in tasks:
|
||||
if not running:
|
||||
break
|
||||
process_task(task)
|
||||
for _ in range(POLL_INTERVAL):
|
||||
if not running:
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user