Files
task-team/odoo_modules/task_team_connector/models/project_task.py
Claude CLI Agent 6c93ae04d0 Fix login redirect — access result.data.token/user
Login/register pages now correctly unwrap API response {data: {token, user}}.
Cleaned up leftover apiFetch code.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 10:30:05 +00:00

249 lines
8.8 KiB
Python

# Task Team Connector — project.task extension
import logging
from datetime import datetime
from odoo import api, fields, models
_logger = logging.getLogger(__name__)
# Fields whose changes should trigger an outbound sync
_SYNC_FIELDS = frozenset({
'name', 'description', 'stage_id', 'date_deadline',
'priority', 'user_ids', 'project_id', 'tag_ids',
})
class ProjectTask(models.Model):
_inherit = 'project.task'
tt_task_id = fields.Char(
string='Task Team ID',
readonly=True,
copy=False,
index=True,
help='UUID of the corresponding record in Task Team',
)
tt_sync_enabled = fields.Boolean(
string='Sync with Task Team',
default=True,
)
tt_last_sync_at = fields.Datetime(
string='Last Synced',
readonly=True,
copy=False,
)
tt_sync_error = fields.Char(
string='Sync Error',
readonly=True,
copy=False,
)
# ------------------------------------------------------------------
# ORM overrides
# ------------------------------------------------------------------
@api.model_create_multi
def create(self, vals_list):
tasks = super().create(vals_list)
if not self.env.context.get('tt_no_sync'):
for task in tasks:
if task.tt_sync_enabled:
task._tt_push()
return tasks
def write(self, vals):
stage_changing = 'stage_id' in vals
old_stages = {t.id: t.stage_id for t in self} if stage_changing else {}
result = super().write(vals)
if self.env.context.get('tt_no_sync'):
return result
for task in self:
if not task.tt_sync_enabled:
continue
# Fire completion webhook when task enters a closed stage
if stage_changing:
old = old_stages.get(task.id)
new = task.stage_id
was_open = not old or not getattr(old, 'is_closed', False)
is_now_closed = new and getattr(new, 'is_closed', False)
if was_open and is_now_closed:
task._tt_send_completion_webhook()
# Push all tracked field changes
if _SYNC_FIELDS.intersection(vals):
task._tt_push()
return result
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _tt_api_client(self):
from ..services.api_client import TaskTeamApiClient
ICP = self.env['ir.config_parameter'].sudo()
return TaskTeamApiClient(
base_url=ICP.get_param('task_team_connector.api_url', 'https://api.hasdo.info/api/v1'),
api_key=ICP.get_param('task_team_connector.api_key', ''),
)
def _tt_build_payload(self):
"""Serialize this task for the Task Team API."""
stage = self.stage_id
is_closed = getattr(stage, 'is_closed', False) if stage else False
if is_closed:
status = 'done'
elif stage and getattr(stage, 'sequence', 0) > 1:
status = 'in_progress'
else:
status = 'pending'
priority_map = {'0': 'medium', '1': 'high'}
return {
'title': self.name,
'description': self.description or '',
'status': status,
'priority': priority_map.get(self.priority, 'medium'),
'due_at': self.date_deadline.isoformat() if self.date_deadline else None,
'external_id': str(self.id),
'external_source': 'odoo',
}
def _tt_push(self):
"""Create or update this task in Task Team (best-effort)."""
self.ensure_one()
try:
client = self._tt_api_client()
payload = self._tt_build_payload()
tt_user_email = self.env.user.email
if self.tt_task_id:
client.update_task(self.tt_task_id, payload, tt_user_email)
else:
result = client.create_task(payload, tt_user_email)
new_id = (result or {}).get('data', {}).get('id')
if new_id:
self.sudo().with_context(tt_no_sync=True).write({
'tt_task_id': new_id,
'tt_last_sync_at': datetime.utcnow(),
'tt_sync_error': False,
})
self.sudo().with_context(tt_no_sync=True).write({
'tt_last_sync_at': datetime.utcnow(),
'tt_sync_error': False,
})
self.env['task.team.sync.log'].sudo().create({
'task_id': self.id,
'tt_task_id': self.tt_task_id,
'direction': 'outbound',
'status': 'success',
'payload': str(payload),
})
except Exception as exc:
_logger.error('Task Team push failed for task %s: %s', self.id, exc)
self.sudo().with_context(tt_no_sync=True).write({'tt_sync_error': str(exc)[:255]})
self.env['task.team.sync.log'].sudo().create({
'task_id': self.id,
'direction': 'outbound',
'status': 'error',
'error_message': str(exc),
})
def _tt_send_completion_webhook(self):
"""Notify Task Team that this task was completed."""
self.ensure_one()
try:
client = self._tt_api_client()
client.send_webhook('task_completed', {
'odoo_task_id': self.id,
'tt_task_id': self.tt_task_id,
'task_name': self.name,
'completed_at': datetime.utcnow().isoformat() + 'Z',
'completed_by_email': self.env.user.email,
'project_name': self.project_id.name if self.project_id else None,
})
except Exception as exc:
_logger.error('Completion webhook failed for task %s: %s', self.id, exc)
# ------------------------------------------------------------------
# Public actions
# ------------------------------------------------------------------
def action_tt_sync_now(self):
"""Button: sync selected tasks immediately."""
for task in self:
task._tt_push()
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'title': 'Task Team',
'message': f'{len(self)} task(s) pushed to Task Team.',
'sticky': False,
'type': 'success',
},
}
# ------------------------------------------------------------------
# Inbound (webhook → Odoo)
# ------------------------------------------------------------------
@api.model
def tt_upsert_from_webhook(self, data):
"""Create or update an Odoo task from a Task Team webhook payload."""
tt_task_id = data.get('id')
existing = self.search([('tt_task_id', '=', tt_task_id)], limit=1) if tt_task_id else self.browse()
ICP = self.env['ir.config_parameter'].sudo()
project_param = ICP.get_param('task_team_connector.default_project_id')
project_id = int(project_param) if project_param else None
vals = {
'name': data.get('title') or 'Task from Task Team',
'description': data.get('description', ''),
'tt_task_id': tt_task_id,
'tt_sync_enabled': True,
}
if project_id:
vals['project_id'] = project_id
if data.get('due_at'):
try:
vals['date_deadline'] = datetime.fromisoformat(
data['due_at'].replace('Z', '+00:00')
)
except Exception:
pass
if existing:
existing.with_context(tt_no_sync=True).write(vals)
task = existing
else:
task = self.with_context(tt_no_sync=True).create(vals)
self.env['task.team.sync.log'].sudo().create({
'task_id': task.id,
'tt_task_id': tt_task_id,
'direction': 'inbound',
'status': 'success',
'payload': str(data),
})
return task
# ------------------------------------------------------------------
# Cron
# ------------------------------------------------------------------
@api.model
def cron_tt_sync_all(self):
"""Scheduled action: push all sync-enabled tasks to Task Team."""
ICP = self.env['ir.config_parameter'].sudo()
if ICP.get_param('task_team_connector.sync_enabled', 'True') != 'True':
return
tasks = self.search([('tt_sync_enabled', '=', True)])
_logger.info('Task Team cron: syncing %d tasks', len(tasks))
for task in tasks:
task._tt_push()