[WEB-4873]: Add webhook log cleanup task and update Celery schedule (#7772)

This commit is contained in:
Nikhil 2025-09-18 20:09:01 +05:30 committed by GitHub
parent 68d72daa90
commit 69c688b017
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 74 additions and 5 deletions

View File

@ -21,13 +21,14 @@ from plane.db.models import (
PageVersion,
APIActivityLog,
IssueDescriptionVersion,
WebhookLog,
)
from plane.settings.mongo import MongoConnection
from plane.utils.exception_logger import log_exception
logger = logging.getLogger("plane.worker")
BATCH_SIZE = 1000
BATCH_SIZE = 500
def get_mongo_collection(collection_name: str) -> Optional[Collection]:
@ -247,6 +248,27 @@ def transform_issue_description_version(record: Dict) -> Dict:
}
def transform_webhook_log(record: Dict):
"""Transfer webhook logs to a new destination."""
return {
"id": str(record["id"]),
"created_at": str(record["created_at"]) if record.get("created_at") else None,
"workspace_id": str(record["workspace_id"]),
"webhook": str(record["webhook"]),
# Request
"event_type": str(record["event_type"]),
"request_method": str(record["request_method"]),
"request_headers": str(record["request_headers"]),
"request_body": str(record["request_body"]),
# Response
"response_status": str(record["response_status"]),
"response_body": str(record["response_body"]),
"response_headers": str(record["response_headers"]),
# retry count
"retry_count": str(record["retry_count"]),
}
# Queryset functions for each cleanup task
def get_api_logs_queryset():
"""Get API logs older than cutoff days."""
@ -374,7 +396,34 @@ def get_issue_description_versions_queryset():
)
# Celery tasks - now much simpler!
def get_webhook_logs_queryset():
"""Get email logs older than cutoff days."""
cutoff_days = int(os.environ.get("HARD_DELETE_AFTER_DAYS", 30))
cutoff_time = timezone.now() - timedelta(days=cutoff_days)
logger.info(f"Webhook logs cutoff time: {cutoff_time}")
return (
WebhookLog.all_objects.filter(created_at__lte=cutoff_time)
.values(
"id",
"created_at",
"workspace_id",
"webhook",
"event_type",
# Request
"request_method",
"request_headers",
"request_body",
# Response
"response_status",
"response_body",
"response_headers",
"retry_count",
)
.iterator(chunk_size=BATCH_SIZE)
)
@shared_task
def delete_api_logs():
"""Delete old API activity logs."""
@ -421,3 +470,15 @@ def delete_issue_description_versions():
task_name="Issue Description Version",
collection_name="issue_description_versions",
)
@shared_task
def delete_webhook_logs():
"""Delete old webhook logs"""
process_cleanup_task(
queryset_func=get_webhook_logs_queryset,
transform_func=transform_webhook_log,
model=WebhookLog,
task_name="Webhook Log",
collection_name="webhook_logs",
)

View File

@ -55,15 +55,23 @@ app.conf.beat_schedule = {
},
"check-every-day-to-delete-email-notification-logs": {
"task": "plane.bgtasks.cleanup_task.delete_email_notification_logs",
"schedule": crontab(hour=3, minute=0), # UTC 03:00
"schedule": crontab(hour=2, minute=45), # UTC 02:45
},
"check-every-day-to-delete-page-versions": {
"task": "plane.bgtasks.cleanup_task.delete_page_versions",
"schedule": crontab(hour=3, minute=30), # UTC 03:30
"schedule": crontab(hour=3, minute=0), # UTC 03:00
},
"check-every-day-to-delete-issue-description-versions": {
"task": "plane.bgtasks.cleanup_task.delete_issue_description_versions",
"schedule": crontab(hour=4, minute=0), # UTC 04:00
"schedule": crontab(hour=3, minute=15), # UTC 03:15
},
"check-every-day-to-delete-webhook-logs": {
"task": "plane.bgtasks.cleanup_task.delete_webhook_logs",
"schedule": crontab(hour=3, minute=30), # UTC 03:30
},
"check-every-day-to-delete-exporter-history": {
"task": "plane.bgtasks.exporter_expired_task.delete_old_s3_link",
"schedule": crontab(hour=3, minute=45), # UTC 03:45
},
}