# Task Team Connector — project.task extension import logging from datetime import datetime from odoo import api, fields, models _logger = logging.getLogger(__name__) # Fields whose changes should trigger an outbound sync _SYNC_FIELDS = frozenset({ 'name', 'description', 'stage_id', 'date_deadline', 'priority', 'user_ids', 'project_id', 'tag_ids', }) class ProjectTask(models.Model): _inherit = 'project.task' tt_task_id = fields.Char( string='Task Team ID', readonly=True, copy=False, index=True, help='UUID of the corresponding record in Task Team', ) tt_sync_enabled = fields.Boolean( string='Sync with Task Team', default=True, ) tt_last_sync_at = fields.Datetime( string='Last Synced', readonly=True, copy=False, ) tt_sync_error = fields.Char( string='Sync Error', readonly=True, copy=False, ) # ------------------------------------------------------------------ # ORM overrides # ------------------------------------------------------------------ @api.model_create_multi def create(self, vals_list): tasks = super().create(vals_list) if not self.env.context.get('tt_no_sync'): for task in tasks: if task.tt_sync_enabled: task._tt_push() return tasks def write(self, vals): stage_changing = 'stage_id' in vals old_stages = {t.id: t.stage_id for t in self} if stage_changing else {} result = super().write(vals) if self.env.context.get('tt_no_sync'): return result for task in self: if not task.tt_sync_enabled: continue # Fire completion webhook when task enters a closed stage if stage_changing: old = old_stages.get(task.id) new = task.stage_id was_open = not old or not getattr(old, 'is_closed', False) is_now_closed = new and getattr(new, 'is_closed', False) if was_open and is_now_closed: task._tt_send_completion_webhook() # Push all tracked field changes if _SYNC_FIELDS.intersection(vals): task._tt_push() return result # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _tt_api_client(self): from ..services.api_client import TaskTeamApiClient ICP = self.env['ir.config_parameter'].sudo() return TaskTeamApiClient( base_url=ICP.get_param('task_team_connector.api_url', 'https://api.hasdo.info/api/v1'), api_key=ICP.get_param('task_team_connector.api_key', ''), ) def _tt_build_payload(self): """Serialize this task for the Task Team API.""" stage = self.stage_id is_closed = getattr(stage, 'is_closed', False) if stage else False if is_closed: status = 'done' elif stage and getattr(stage, 'sequence', 0) > 1: status = 'in_progress' else: status = 'pending' priority_map = {'0': 'medium', '1': 'high'} return { 'title': self.name, 'description': self.description or '', 'status': status, 'priority': priority_map.get(self.priority, 'medium'), 'due_at': self.date_deadline.isoformat() if self.date_deadline else None, 'external_id': str(self.id), 'external_source': 'odoo', } def _tt_push(self): """Create or update this task in Task Team (best-effort).""" self.ensure_one() try: client = self._tt_api_client() payload = self._tt_build_payload() tt_user_email = self.env.user.email if self.tt_task_id: client.update_task(self.tt_task_id, payload, tt_user_email) else: result = client.create_task(payload, tt_user_email) new_id = (result or {}).get('data', {}).get('id') if new_id: self.sudo().with_context(tt_no_sync=True).write({ 'tt_task_id': new_id, 'tt_last_sync_at': datetime.utcnow(), 'tt_sync_error': False, }) self.sudo().with_context(tt_no_sync=True).write({ 'tt_last_sync_at': datetime.utcnow(), 'tt_sync_error': False, }) self.env['task.team.sync.log'].sudo().create({ 'task_id': self.id, 'tt_task_id': self.tt_task_id, 'direction': 'outbound', 'status': 'success', 'payload': str(payload), }) except Exception as exc: _logger.error('Task Team push failed for task %s: %s', self.id, exc) self.sudo().with_context(tt_no_sync=True).write({'tt_sync_error': str(exc)[:255]}) self.env['task.team.sync.log'].sudo().create({ 'task_id': self.id, 'direction': 'outbound', 'status': 'error', 'error_message': str(exc), }) def _tt_send_completion_webhook(self): """Notify Task Team that this task was completed.""" self.ensure_one() try: client = self._tt_api_client() client.send_webhook('task_completed', { 'odoo_task_id': self.id, 'tt_task_id': self.tt_task_id, 'task_name': self.name, 'completed_at': datetime.utcnow().isoformat() + 'Z', 'completed_by_email': self.env.user.email, 'project_name': self.project_id.name if self.project_id else None, }) except Exception as exc: _logger.error('Completion webhook failed for task %s: %s', self.id, exc) # ------------------------------------------------------------------ # Public actions # ------------------------------------------------------------------ def action_tt_sync_now(self): """Button: sync selected tasks immediately.""" for task in self: task._tt_push() return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': 'Task Team', 'message': f'{len(self)} task(s) pushed to Task Team.', 'sticky': False, 'type': 'success', }, } # ------------------------------------------------------------------ # Inbound (webhook → Odoo) # ------------------------------------------------------------------ @api.model def tt_upsert_from_webhook(self, data): """Create or update an Odoo task from a Task Team webhook payload.""" tt_task_id = data.get('id') existing = self.search([('tt_task_id', '=', tt_task_id)], limit=1) if tt_task_id else self.browse() ICP = self.env['ir.config_parameter'].sudo() project_param = ICP.get_param('task_team_connector.default_project_id') project_id = int(project_param) if project_param else None vals = { 'name': data.get('title') or 'Task from Task Team', 'description': data.get('description', ''), 'tt_task_id': tt_task_id, 'tt_sync_enabled': True, } if project_id: vals['project_id'] = project_id if data.get('due_at'): try: vals['date_deadline'] = datetime.fromisoformat( data['due_at'].replace('Z', '+00:00') ) except Exception: pass if existing: existing.with_context(tt_no_sync=True).write(vals) task = existing else: task = self.with_context(tt_no_sync=True).create(vals) self.env['task.team.sync.log'].sudo().create({ 'task_id': task.id, 'tt_task_id': tt_task_id, 'direction': 'inbound', 'status': 'success', 'payload': str(data), }) return task # ------------------------------------------------------------------ # Cron # ------------------------------------------------------------------ @api.model def cron_tt_sync_all(self): """Scheduled action: push all sync-enabled tasks to Task Team.""" ICP = self.env['ir.config_parameter'].sudo() if ICP.get_param('task_team_connector.sync_enabled', 'True') != 'True': return tasks = self.search([('tt_sync_enabled', '=', True)]) _logger.info('Task Team cron: syncing %d tasks', len(tasks)) for task in tasks: task._tt_push()