from __future__ import annotations import io import json import unittest import zipfile from unittest.mock import patch from app.automation_service import import_workflows_zip, safe_zip_name, workflow_export_payload def workflow_payload(workflow_key: str, name: str = "测试工作流") -> dict: return { "schema_version": "workflow/v1", "workflow_key": workflow_key, "name": name, "description": "用于 ZIP 导入测试", "variables": {}, "settings": {}, "nodes": [], "edges": [], } def zip_bytes(items: dict[str, object]) -> bytes: buffer = io.BytesIO() with zipfile.ZipFile(buffer, "w", compression=zipfile.ZIP_DEFLATED) as archive: for name, payload in items.items(): content = payload if isinstance(payload, str) else json.dumps(payload, ensure_ascii=False) archive.writestr(name, content) return buffer.getvalue() class WorkflowZipTest(unittest.TestCase): def test_import_zip_skips_duplicate_key_and_keeps_processing(self) -> None: content = zip_bytes( { "workflows/new.workflow.json": workflow_payload("zip-new"), "workflows/existing.workflow.json": workflow_payload("zip-existing"), "workflows/broken.workflow.json": "{bad json", "manifest.json": {"schema_version": "workflow-zip/v1"}, } ) with ( patch("app.automation_service.workflow_key_exists", side_effect=lambda key: key == "zip-existing"), patch("app.automation_service.save_workflow", return_value={"id": 99, "workflow_key": "zip-new", "name": "测试工作流"}), ): result = import_workflows_zip(content) self.assertEqual(result["created_count"], 1) self.assertEqual(result["skipped_count"], 1) self.assertEqual(result["failed_count"], 1) self.assertEqual(result["skipped"][0]["workflow_key"], "zip-existing") def test_export_payload_removes_database_fields(self) -> None: payload = workflow_payload("zip-export") exported = workflow_export_payload({**payload, "id": 1, "created_at": "x", "updated_at": "y"}) self.assertEqual(exported, payload) self.assertEqual(safe_zip_name("a/b:c*中文"), "a-b-c") if __name__ == "__main__": unittest.main()