diff --git a/apps/api/plane/bgtasks/api_logs_task.py b/apps/api/plane/bgtasks/api_logs_task.py deleted file mode 100644 index 038b939d5..000000000 --- a/apps/api/plane/bgtasks/api_logs_task.py +++ /dev/null @@ -1,15 +0,0 @@ -from django.utils import timezone -from datetime import timedelta -from plane.db.models import APIActivityLog -from celery import shared_task - - -@shared_task -def delete_api_logs(): - # Get the logs older than 30 days to delete - logs_to_delete = APIActivityLog.objects.filter( - created_at__lte=timezone.now() - timedelta(days=30) - ) - - # Delete the logs - logs_to_delete._raw_delete(logs_to_delete.db) diff --git a/apps/api/plane/bgtasks/cleanup_task.py b/apps/api/plane/bgtasks/cleanup_task.py new file mode 100644 index 000000000..4c86eaea5 --- /dev/null +++ b/apps/api/plane/bgtasks/cleanup_task.py @@ -0,0 +1,423 @@ +# Python imports +from datetime import timedelta +import logging +from typing import List, Dict, Any, Callable, Optional +import os + +# Django imports +from django.utils import timezone +from django.db.models import F, Window, Subquery +from django.db.models.functions import RowNumber + +# Third party imports +from celery import shared_task +from pymongo.errors import BulkWriteError +from pymongo.collection import Collection +from pymongo.operations import InsertOne + +# Module imports +from plane.db.models import ( + EmailNotificationLog, + PageVersion, + APIActivityLog, + IssueDescriptionVersion, +) +from plane.settings.mongo import MongoConnection +from plane.utils.exception_logger import log_exception + + +logger = logging.getLogger("plane.worker") +BATCH_SIZE = 1000 + + +def get_mongo_collection(collection_name: str) -> Optional[Collection]: + """Get MongoDB collection if available, otherwise return None.""" + if not MongoConnection.is_configured(): + logger.info("MongoDB not configured") + return None + + try: + mongo_collection = MongoConnection.get_collection(collection_name) + logger.info(f"MongoDB collection '{collection_name}' connected successfully") + return mongo_collection + except Exception as e: + logger.error(f"Failed to get MongoDB collection: {str(e)}") + log_exception(e) + return None + + +def flush_to_mongo_and_delete( + mongo_collection: Optional[Collection], + buffer: List[Dict[str, Any]], + ids_to_delete: List[int], + model, + mongo_available: bool, +) -> None: + """ + Inserts a batch of records into MongoDB and deletes the corresponding rows from PostgreSQL. + """ + if not buffer: + logger.debug("No records to flush - buffer is empty") + return + + logger.info( + f"Starting batch flush: {len(buffer)} records, {len(ids_to_delete)} IDs to delete" + ) + + mongo_archival_failed = False + + # Try to insert into MongoDB if available + if mongo_collection and mongo_available: + try: + mongo_collection.bulk_write([InsertOne(doc) for doc in buffer]) + except BulkWriteError as bwe: + logger.error(f"MongoDB bulk write error: {str(bwe)}") + log_exception(bwe) + mongo_archival_failed = True + + # If MongoDB is available and archival failed, log the error and return + if mongo_available and mongo_archival_failed: + logger.error(f"MongoDB archival failed for {len(buffer)} records") + return + + # Delete from PostgreSQL - delete() returns (count, {model: count}) + delete_result = model.all_objects.filter(id__in=ids_to_delete).delete() + deleted_count = ( + delete_result[0] if delete_result and isinstance(delete_result, tuple) else 0 + ) + logger.info(f"Batch flush completed: {deleted_count} records deleted") + + +def process_cleanup_task( + queryset_func: Callable, + transform_func: Callable[[Dict], Dict], + model, + task_name: str, + collection_name: str, +): + """ + Generic function to process cleanup tasks. + + Args: + queryset_func: Function that returns the queryset to process + transform_func: Function to transform each record for MongoDB + model: Django model class + task_name: Name of the task for logging + collection_name: MongoDB collection name + """ + logger.info(f"Starting {task_name} cleanup task") + + # Get MongoDB collection + mongo_collection = get_mongo_collection(collection_name) + mongo_available = mongo_collection is not None + + # Get queryset + queryset = queryset_func() + + # Process records in batches + buffer: List[Dict[str, Any]] = [] + ids_to_delete: List[int] = [] + total_processed = 0 + total_batches = 0 + + for record in queryset: + # Transform record for MongoDB + buffer.append(transform_func(record)) + ids_to_delete.append(record["id"]) + + # Flush batch when it reaches BATCH_SIZE + if len(buffer) >= BATCH_SIZE: + total_batches += 1 + flush_to_mongo_and_delete( + mongo_collection=mongo_collection, + buffer=buffer, + ids_to_delete=ids_to_delete, + model=model, + mongo_available=mongo_available, + ) + total_processed += len(buffer) + buffer.clear() + ids_to_delete.clear() + + # Process final batch if any records remain + if buffer: + total_batches += 1 + flush_to_mongo_and_delete( + mongo_collection=mongo_collection, + buffer=buffer, + ids_to_delete=ids_to_delete, + model=model, + mongo_available=mongo_available, + ) + total_processed += len(buffer) + + logger.info( + f"{task_name} cleanup task completed", + extra={ + "total_records_processed": total_processed, + "total_batches": total_batches, + "mongo_available": mongo_available, + "collection_name": collection_name, + }, + ) + + +# Transform functions for each model +def transform_api_log(record: Dict) -> Dict: + """Transform API activity log record.""" + return { + "id": record["id"], + "created_at": str(record["created_at"]) if record.get("created_at") else None, + "token_identifier": record["token_identifier"], + "path": record["path"], + "method": record["method"], + "query_params": record.get("query_params"), + "headers": record.get("headers"), + "body": record.get("body"), + "response_code": record["response_code"], + "response_body": record["response_body"], + "ip_address": record["ip_address"], + "user_agent": record["user_agent"], + "created_by_id": record["created_by_id"], + } + + +def transform_email_log(record: Dict) -> Dict: + """Transform email notification log record.""" + return { + "id": record["id"], + "created_at": str(record["created_at"]) if record.get("created_at") else None, + "receiver_id": record["receiver_id"], + "triggered_by_id": record["triggered_by_id"], + "entity_identifier": record["entity_identifier"], + "entity_name": record["entity_name"], + "data": record["data"], + "processed_at": ( + str(record["processed_at"]) if record.get("processed_at") else None + ), + "sent_at": str(record["sent_at"]) if record.get("sent_at") else None, + "entity": record["entity"], + "old_value": record["old_value"], + "new_value": record["new_value"], + "created_by_id": record["created_by_id"], + } + + +def transform_page_version(record: Dict) -> Dict: + """Transform page version record.""" + return { + "id": record["id"], + "created_at": str(record["created_at"]) if record.get("created_at") else None, + "page_id": record["page_id"], + "workspace_id": record["workspace_id"], + "owned_by_id": record["owned_by_id"], + "description_html": record["description_html"], + "description_binary": record["description_binary"], + "description_stripped": record["description_stripped"], + "description_json": record["description_json"], + "sub_pages_data": record["sub_pages_data"], + "created_by_id": record["created_by_id"], + "updated_by_id": record["updated_by_id"], + "deleted_at": str(record["deleted_at"]) if record.get("deleted_at") else None, + "last_saved_at": ( + str(record["last_saved_at"]) if record.get("last_saved_at") else None + ), + } + + +def transform_issue_description_version(record: Dict) -> Dict: + """Transform issue description version record.""" + return { + "id": record["id"], + "created_at": str(record["created_at"]) if record.get("created_at") else None, + "issue_id": record["issue_id"], + "workspace_id": record["workspace_id"], + "project_id": record["project_id"], + "created_by_id": record["created_by_id"], + "updated_by_id": record["updated_by_id"], + "owned_by_id": record["owned_by_id"], + "last_saved_at": ( + str(record["last_saved_at"]) if record.get("last_saved_at") else None + ), + "description_binary": record["description_binary"], + "description_html": record["description_html"], + "description_stripped": record["description_stripped"], + "description_json": record["description_json"], + "deleted_at": str(record["deleted_at"]) if record.get("deleted_at") else None, + } + + +# Queryset functions for each cleanup task +def get_api_logs_queryset(): + """Get API logs older than cutoff days.""" + cutoff_days = int(os.environ.get("HARD_DELETE_AFTER_DAYS", 30)) + cutoff_time = timezone.now() - timedelta(days=cutoff_days) + logger.info(f"API logs cutoff time: {cutoff_time}") + + return ( + APIActivityLog.all_objects.filter(created_at__lte=cutoff_time) + .values( + "id", + "created_at", + "token_identifier", + "path", + "method", + "query_params", + "headers", + "body", + "response_code", + "response_body", + "ip_address", + "user_agent", + "created_by_id", + ) + .iterator(chunk_size=BATCH_SIZE) + ) + + +def get_email_logs_queryset(): + """Get email logs older than cutoff days.""" + cutoff_days = int(os.environ.get("HARD_DELETE_AFTER_DAYS", 30)) + cutoff_time = timezone.now() - timedelta(days=cutoff_days) + logger.info(f"Email logs cutoff time: {cutoff_time}") + + return ( + EmailNotificationLog.all_objects.filter(sent_at__lte=cutoff_time) + .values( + "id", + "created_at", + "receiver_id", + "triggered_by_id", + "entity_identifier", + "entity_name", + "data", + "processed_at", + "sent_at", + "entity", + "old_value", + "new_value", + "created_by_id", + ) + .iterator(chunk_size=BATCH_SIZE) + ) + + +def get_page_versions_queryset(): + """Get page versions beyond the maximum allowed (20 per page).""" + subq = ( + PageVersion.all_objects.annotate( + row_num=Window( + expression=RowNumber(), + partition_by=[F("page_id")], + order_by=F("created_at").desc(), + ) + ) + .filter(row_num__gt=20) + .values("id") + ) + + return ( + PageVersion.all_objects.filter(id__in=Subquery(subq)) + .values( + "id", + "created_at", + "page_id", + "workspace_id", + "owned_by_id", + "description_html", + "description_binary", + "description_stripped", + "description_json", + "sub_pages_data", + "created_by_id", + "updated_by_id", + "deleted_at", + "last_saved_at", + ) + .iterator(chunk_size=BATCH_SIZE) + ) + + +def get_issue_description_versions_queryset(): + """Get issue description versions beyond the maximum allowed (20 per issue).""" + subq = ( + IssueDescriptionVersion.all_objects.annotate( + row_num=Window( + expression=RowNumber(), + partition_by=[F("issue_id")], + order_by=F("created_at").desc(), + ) + ) + .filter(row_num__gt=20) + .values("id") + ) + + return ( + IssueDescriptionVersion.all_objects.filter(id__in=Subquery(subq)) + .values( + "id", + "created_at", + "issue_id", + "workspace_id", + "project_id", + "created_by_id", + "updated_by_id", + "owned_by_id", + "last_saved_at", + "description_binary", + "description_html", + "description_stripped", + "description_json", + "deleted_at", + ) + .iterator(chunk_size=BATCH_SIZE) + ) + + +# Celery tasks - now much simpler! +@shared_task +def delete_api_logs(): + """Delete old API activity logs.""" + process_cleanup_task( + queryset_func=get_api_logs_queryset, + transform_func=transform_api_log, + model=APIActivityLog, + task_name="API Activity Log", + collection_name="api_activity_logs", + ) + + +@shared_task +def delete_email_notification_logs(): + """Delete old email notification logs.""" + process_cleanup_task( + queryset_func=get_email_logs_queryset, + transform_func=transform_email_log, + model=EmailNotificationLog, + task_name="Email Notification Log", + collection_name="email_notification_logs", + ) + + +@shared_task +def delete_page_versions(): + """Delete excess page versions.""" + process_cleanup_task( + queryset_func=get_page_versions_queryset, + transform_func=transform_page_version, + model=PageVersion, + task_name="Page Version", + collection_name="page_versions", + ) + + +@shared_task +def delete_issue_description_versions(): + """Delete excess issue description versions.""" + process_cleanup_task( + queryset_func=get_issue_description_versions_queryset, + transform_func=transform_issue_description_version, + model=IssueDescriptionVersion, + task_name="Issue Description Version", + collection_name="issue_description_versions", + ) diff --git a/apps/api/plane/celery.py b/apps/api/plane/celery.py index 0ffa4689b..2eeac358c 100644 --- a/apps/api/plane/celery.py +++ b/apps/api/plane/celery.py @@ -50,9 +50,21 @@ app.conf.beat_schedule = { "schedule": crontab(hour=2, minute=0), # UTC 02:00 }, "check-every-day-to-delete-api-logs": { - "task": "plane.bgtasks.api_logs_task.delete_api_logs", + "task": "plane.bgtasks.cleanup_task.delete_api_logs", "schedule": crontab(hour=2, minute=30), # UTC 02:30 }, + "check-every-day-to-delete-email-notification-logs": { + "task": "plane.bgtasks.cleanup_task.delete_email_notification_logs", + "schedule": crontab(hour=3, minute=0), # UTC 03:00 + }, + "check-every-day-to-delete-page-versions": { + "task": "plane.bgtasks.cleanup_task.delete_page_versions", + "schedule": crontab(hour=3, minute=30), # UTC 03:30 + }, + "check-every-day-to-delete-issue-description-versions": { + "task": "plane.bgtasks.cleanup_task.delete_issue_description_versions", + "schedule": crontab(hour=4, minute=0), # UTC 04:00 + }, } diff --git a/apps/api/plane/settings/common.py b/apps/api/plane/settings/common.py index cad124901..8f776d1ce 100644 --- a/apps/api/plane/settings/common.py +++ b/apps/api/plane/settings/common.py @@ -284,7 +284,7 @@ CELERY_IMPORTS = ( "plane.bgtasks.exporter_expired_task", "plane.bgtasks.file_asset_task", "plane.bgtasks.email_notification_task", - "plane.bgtasks.api_logs_task", + "plane.bgtasks.cleanup_task", "plane.license.bgtasks.tracer", # management tasks "plane.bgtasks.dummy_data_task", diff --git a/apps/api/plane/settings/local.py b/apps/api/plane/settings/local.py index db60501f7..15af36a2d 100644 --- a/apps/api/plane/settings/local.py +++ b/apps/api/plane/settings/local.py @@ -73,5 +73,10 @@ LOGGING = { "handlers": ["console"], "propagate": False, }, + "plane.mongo": { + "level": "INFO", + "handlers": ["console"], + "propagate": False, + }, }, } diff --git a/apps/api/plane/settings/mongo.py b/apps/api/plane/settings/mongo.py new file mode 100644 index 000000000..e64cc7ff4 --- /dev/null +++ b/apps/api/plane/settings/mongo.py @@ -0,0 +1,121 @@ +# Django imports +from django.conf import settings +import logging + +# Third party imports +from pymongo import MongoClient +from pymongo.database import Database +from pymongo.collection import Collection +from typing import Optional, TypeVar, Type + + +T = TypeVar("T", bound="MongoConnection") + +# Set up logger +logger = logging.getLogger("plane.mongo") + + +class MongoConnection: + """ + A singleton class that manages MongoDB connections. + + This class ensures only one MongoDB connection is maintained throughout the application. + It provides methods to access the MongoDB client, database, and collections. + + Attributes: + _instance (Optional[MongoConnection]): The singleton instance of this class + _client (Optional[MongoClient]): The MongoDB client instance + _db (Optional[Database]): The MongoDB database instance + """ + + _instance: Optional["MongoConnection"] = None + _client: Optional[MongoClient] = None + _db: Optional[Database] = None + + def __new__(cls: Type[T]) -> T: + """ + Creates a new instance of MongoConnection if one doesn't exist. + + Returns: + MongoConnection: The singleton instance + """ + if cls._instance is None: + cls._instance = super(MongoConnection, cls).__new__(cls) + try: + mongo_url = getattr(settings, "MONGO_DB_URL", None) + mongo_db_database = getattr(settings, "MONGO_DB_DATABASE", None) + + if not mongo_url or not mongo_db_database: + logger.warning( + "MongoDB connection parameters not configured. MongoDB functionality will be disabled." + ) + return cls._instance + + cls._client = MongoClient(mongo_url) + cls._db = cls._client[mongo_db_database] + + # Test the connection + cls._client.server_info() + logger.info("MongoDB connection established successfully") + except Exception as e: + logger.warning( + f"Failed to initialize MongoDB connection: {str(e)}. MongoDB functionality will be disabled." + ) + return cls._instance + + @classmethod + def get_client(cls) -> Optional[MongoClient]: + """ + Returns the MongoDB client instance. + + Returns: + Optional[MongoClient]: The MongoDB client instance or None if not configured + """ + if cls._client is None: + cls._instance = cls() + return cls._client + + @classmethod + def get_db(cls) -> Optional[Database]: + """ + Returns the MongoDB database instance. + + Returns: + Optional[Database]: The MongoDB database instance or None if not configured + """ + if cls._db is None: + cls._instance = cls() + return cls._db + + @classmethod + def get_collection(cls, collection_name: str) -> Optional[Collection]: + """ + Returns a MongoDB collection by name. + + Args: + collection_name (str): The name of the collection to retrieve + + Returns: + Optional[Collection]: The MongoDB collection instance or None if not configured + """ + try: + db = cls.get_db() + if db is None: + logger.warning( + f"Cannot access collection '{collection_name}': MongoDB not configured" + ) + return None + return db[collection_name] + except Exception as e: + logger.warning(f"Failed to access collection '{collection_name}': {str(e)}") + return None + + @classmethod + def is_configured(cls) -> bool: + """ + Check if MongoDB is properly configured and connected. + + Returns: + bool: True if MongoDB is configured and connected, False otherwise + """ + return cls._client is not None and cls._db is not None diff --git a/apps/api/plane/settings/production.py b/apps/api/plane/settings/production.py index abd95d006..4f4e99bdb 100644 --- a/apps/api/plane/settings/production.py +++ b/apps/api/plane/settings/production.py @@ -83,5 +83,10 @@ LOGGING = { "handlers": ["console"], "propagate": False, }, + "plane.mongo": { + "level": "INFO", + "handlers": ["console"], + "propagate": False, + }, }, } diff --git a/apps/api/requirements/base.txt b/apps/api/requirements/base.txt index 78e9efed3..69f4ec837 100644 --- a/apps/api/requirements/base.txt +++ b/apps/api/requirements/base.txt @@ -66,4 +66,6 @@ opentelemetry-sdk==1.28.1 opentelemetry-instrumentation-django==0.49b1 opentelemetry-exporter-otlp==1.28.1 # OpenAPI Specification -drf-spectacular==0.28.0 \ No newline at end of file +drf-spectacular==0.28.0 +# mongo +pymongo==4.6.3 \ No newline at end of file