Refactor account and permission handling: Simplified account filtering logic in AccountModelViewSet and removed redundant admin/system user checks from permissions. Enhanced user access methods to streamline site access verification and improved error handling for account context requirements. Updated throttling logic to eliminate unnecessary system account bypass conditions.

This commit is contained in:
IGNY8 VPS (Salman)
2025-12-08 05:23:06 +00:00
parent f0066b6e7d
commit 6dcbc651dd
12 changed files with 483 additions and 324 deletions

View File

@@ -19,34 +19,21 @@ class AccountModelViewSet(viewsets.ModelViewSet):
# Filter by account if model has account field
if hasattr(queryset.model, 'account'):
user = getattr(self.request, 'user', None)
# ADMIN/DEV/SYSTEM ACCOUNT OVERRIDE: Skip account filtering for:
# - Admins and developers (by role)
# - Users in system accounts (aws-admin, default-account)
if user and hasattr(user, 'is_authenticated') and user.is_authenticated:
try:
# Check if user has admin/developer privileges
is_admin_or_dev = (hasattr(user, 'is_admin_or_developer') and user.is_admin_or_developer()) if user else False
is_system_user = (hasattr(user, 'is_system_account_user') and user.is_system_account_user()) if user else False
if is_admin_or_dev or is_system_user:
# Skip account filtering - allow all accounts
pass
account = getattr(self.request, 'account', None)
if not account and hasattr(self.request, 'user') and self.request.user and hasattr(self.request.user, 'is_authenticated') and self.request.user.is_authenticated:
user_account = getattr(self.request.user, 'account', None)
if user_account:
account = user_account
if account:
queryset = queryset.filter(account=account)
else:
# Get account from request (set by middleware)
account = getattr(self.request, 'account', None)
if account:
queryset = queryset.filter(account=account)
elif hasattr(self.request, 'user') and self.request.user and hasattr(self.request.user, 'is_authenticated') and self.request.user.is_authenticated:
# Fallback to user's account
try:
user_account = getattr(self.request.user, 'account', None)
if user_account:
queryset = queryset.filter(account=user_account)
except (AttributeError, Exception):
# If account access fails (e.g., column mismatch), skip account filtering
pass
except (AttributeError, TypeError) as e:
# No account context -> block access
return queryset.none()
except (AttributeError, TypeError):
# If there's an error accessing user attributes, return empty queryset
return queryset.none()
else:
@@ -61,11 +48,11 @@ class AccountModelViewSet(viewsets.ModelViewSet):
try:
account = getattr(self.request.user, 'account', None)
except (AttributeError, Exception):
# If account access fails (e.g., column mismatch), set to None
account = None
# If model has account field, set it
if account and hasattr(serializer.Meta.model, 'account'):
if hasattr(serializer.Meta.model, 'account'):
if not account:
raise PermissionDenied("Account context is required to create this object.")
serializer.save(account=account)
else:
serializer.save()
@@ -253,24 +240,16 @@ class SiteSectorModelViewSet(AccountModelViewSet):
# Check if user is authenticated and is a proper User instance (not AnonymousUser)
if user and hasattr(user, 'is_authenticated') and user.is_authenticated and hasattr(user, 'get_accessible_sites'):
try:
# ADMIN/DEV/SYSTEM ACCOUNT OVERRIDE: Developers, admins, and system account users
# can see all data regardless of site/sector
if (hasattr(user, 'is_admin_or_developer') and user.is_admin_or_developer()) or \
(hasattr(user, 'is_system_account_user') and user.is_system_account_user()):
# Skip site/sector filtering for admins, developers, and system account users
# But still respect optional query params if provided
pass
# Get user's accessible sites
accessible_sites = user.get_accessible_sites()
# If no accessible sites, return empty queryset
if not accessible_sites.exists():
queryset = queryset.none()
else:
# Get user's accessible sites
accessible_sites = user.get_accessible_sites()
# If no accessible sites, return empty queryset (unless admin/developer/system account)
if not accessible_sites.exists():
queryset = queryset.none()
else:
# Filter by accessible sites
queryset = queryset.filter(site__in=accessible_sites)
except (AttributeError, TypeError) as e:
# Filter by accessible sites
queryset = queryset.filter(site__in=accessible_sites)
except (AttributeError, TypeError):
# If there's an error accessing user attributes, return empty queryset
queryset = queryset.none()
else:
@@ -295,21 +274,14 @@ class SiteSectorModelViewSet(AccountModelViewSet):
# Convert site_id to int if it's a string
site_id_int = int(site_id) if site_id else None
if site_id_int:
# ADMIN/DEV/SYSTEM ACCOUNT OVERRIDE: Admins, developers, and system account users
# can filter by any site, others must verify access
if user and hasattr(user, 'is_authenticated') and user.is_authenticated and hasattr(user, 'get_accessible_sites'):
try:
if (hasattr(user, 'is_admin_or_developer') and user.is_admin_or_developer()) or \
(hasattr(user, 'is_system_account_user') and user.is_system_account_user()):
# Admin/Developer/System Account User can filter by any site
accessible_sites = user.get_accessible_sites()
if accessible_sites.filter(id=site_id_int).exists():
queryset = queryset.filter(site_id=site_id_int)
else:
accessible_sites = user.get_accessible_sites()
if accessible_sites.filter(id=site_id_int).exists():
queryset = queryset.filter(site_id=site_id_int)
else:
queryset = queryset.none() # Site not accessible
except (AttributeError, TypeError) as e:
queryset = queryset.none() # Site not accessible
except (AttributeError, TypeError):
# If there's an error accessing user attributes, return empty queryset
queryset = queryset.none()
else:
@@ -369,14 +341,10 @@ class SiteSectorModelViewSet(AccountModelViewSet):
if user and hasattr(user, 'is_authenticated') and user.is_authenticated and site:
try:
# ADMIN/DEV/SYSTEM ACCOUNT OVERRIDE: Admins, developers, and system account users
# can create in any site, others must verify access
if not ((hasattr(user, 'is_admin_or_developer') and user.is_admin_or_developer()) or
(hasattr(user, 'is_system_account_user') and user.is_system_account_user())):
if hasattr(user, 'get_accessible_sites'):
accessible_sites = user.get_accessible_sites()
if not accessible_sites.filter(id=site.id).exists():
raise PermissionDenied("You do not have access to this site")
if hasattr(user, 'get_accessible_sites'):
accessible_sites = user.get_accessible_sites()
if not accessible_sites.filter(id=site.id).exists():
raise PermissionDenied("You do not have access to this site")
# Verify sector belongs to site
if sector and hasattr(sector, 'site') and sector.site != site: