Initial commit.

This commit is contained in:
Daniele Maglie 2024-01-21 14:20:04 +01:00
commit c38cd3f855
74 changed files with 3111 additions and 0 deletions

6
.gitignore vendored Normal file
View file

@ -0,0 +1,6 @@
__pycache__/
build/
eggs/
.eggs/
*.egg
*.egg-info/

3
README.md Normal file
View file

@ -0,0 +1,3 @@
### Data Model
Mandatory requirements and rules for parameters.

5
pyproject.toml Normal file
View file

@ -0,0 +1,5 @@
[build-system]
requires = [
"setuptools>=54",
]
build-backend = "setuptools.build_meta"

13
setup.cfg Normal file
View file

@ -0,0 +1,13 @@
[metadata]
name = piracyshield_data_model
version = 1.0.0
description = Data Model
[options]
package_dir=
=src
packages = find:
python_requires = >= 3.9
[options.packages.find]
where = src

View file

@ -0,0 +1 @@

View file

@ -0,0 +1 @@

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,45 @@
from piracyshield_data_model.base import BaseModel
class AccountFlagsModel(BaseModel):
flags = {
'change_password': False
}
def __init__(self, flags: dict):
# little hack
self.flags = self.flags
if not all(flag in self.flags.keys() for flag in flags.keys()):
raise AccountFlagsModelUnknownFlagException()
self.flags['change_password'] = self._validate_flag(flags['change_password'])
def _validate_flag(self, value: bool) -> bool | Exception:
"""
Validates the single flag value.
:param value: true/false if the flag is active or not.
:return: the same value.
"""
if isinstance(value, bool):
return value
raise AccountFlagsModelChangePasswordException()
class AccountFlagsModelUnknownFlagException(Exception):
"""
Unknown flag.
"""
pass
class AccountFlagsModelValueException(Exception):
"""
Non valid value.
"""
pass

View file

@ -0,0 +1,241 @@
from piracyshield_data_model.base import BaseModel
from piracyshield_component.validation.validator import Validator
from piracyshield_data_model.account.rule import AccountRule
from piracyshield_data_model.account.role.model import AccountRoleModel
class AccountModel(BaseModel):
"""
Central account data modeling class.
"""
account_id = None
name = None
email = None
password = None
role = None
is_active = True
def __init__(self, account_id: str, name: str, email: str, password: str, confirm_password: str, role: int, is_active: bool):
"""
Validates the parameters.
:param account_id: a valid account identifier.
:param name: name of the account.
:param email: e-mail of the account.
:param password: a valid password.
:param confirm_password: the exact copy of the password.
:param role: an int identifying the account role.
:param is_active: whether the account is active or not.
"""
self.account_id = self._validate_account_id(account_id)
self.name = self._validate_name(name)
self.email = self._validate_email(email)
self.password = self._validate_password(password)
self.confirm_password = self._validate_confirm_password(confirm_password)
self.role = self._validate_role(role)
self.is_active = self._validate_is_active(is_active)
def _validate_account_id(self, value: str) -> str | Exception:
"""
Validates the account identifier.
:param value: a valid string based on the required rules.
:return: the same value.
"""
validator = Validator(value, AccountRule.ACCOUNT_ID)
validator.validate()
if not validator.is_valid():
raise AccountModelAccountIDException(validator.errors)
return value
def _validate_name(self, value: str) -> str | Exception:
"""
Validates the account name.
:param value: a valid string based on the required rules.
:return: the same value.
"""
validator = Validator(value, AccountRule.NAME)
validator.validate()
if not validator.is_valid():
raise AccountModelNameException(validator.errors)
return value
def _validate_email(self, value: str) -> str | Exception:
"""
Validates the account e-mail.
:param value: a valid string based on the required rules.
:return: the same value.
"""
validator = Validator(value, AccountRule.EMAIL)
validator.validate()
if not validator.is_valid():
raise AccountModelEmailException(validator.errors)
return value
def _validate_password(self, value: str) -> str | Exception:
"""
Validates the account password.
:param value: a valid string based on the required rules.
:return: the same value.
"""
validator = Validator(value, AccountRule.PASSWORD)
validator.validate()
if not validator.is_valid():
raise AccountModelPasswordException(validator.errors)
return value
def _validate_confirm_password(self, value: str) -> str | Exception:
"""
Validates the password confirmation.
:param value: a valid string based on the required rules.
:return: the same value.
"""
validator = Validator(value, AccountRule.PASSWORD)
validator.validate()
if not validator.is_valid():
raise AccountModelConfirmPasswordException(validator.errors)
# finally verify if the two passwords match
if value != self.password:
raise AccountModelConfirmPasswordMismatchException(validator.errors)
return value
def _validate_role(self, value: int) -> int | Exception:
"""
Validates the account role identifier.
:param value: a valid integer based on the required rules.
:return: the same value.
"""
validator = Validator(value, AccountRule.ROLE)
validator.validate()
if not validator.is_valid():
raise AccountModelRoleException(validator.errors)
try:
enum = AccountRoleModel(value)
return enum.value
except ValueError:
raise AccountModelRoleException()
def _validate_is_active(self, value: bool) -> bool | Exception:
"""
Validates the account status.
:param value: true/false if active or not.
:return: the same value.
"""
if isinstance(value, bool):
return value
raise AccountModelIsActiveException()
class AccountModelAccountIDException(Exception):
"""
Non valid account identifier.
"""
pass
class AccountModelNameException(Exception):
"""
Non valid name.
"""
pass
class AccountModelEmailException(Exception):
"""
Non valid e-mail address.
"""
pass
class AccountModelPasswordException(Exception):
"""
Non valid password.
"""
pass
class AccountModelConfirmPasswordException(Exception):
"""
Non valid confirm password.
"""
pass
class AccountModelConfirmPasswordMismatchException(Exception):
"""
Confirm password differs from password.
"""
pass
class AccountModelRoleException(Exception):
"""
Non valid role.
"""
pass
class AccountModelIsActiveException(Exception):
"""
Non valid is active value.
"""
pass

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,15 @@
from enum import IntEnum
class AccountRoleModel(IntEnum):
"""
Account types with relative identifier.
"""
GUEST = 100
INTERNAL = 200
REPORTER = 300
PROVIDER = 400

View file

@ -0,0 +1,39 @@
from piracyshield_component.validation.rules.required import Required
from piracyshield_component.validation.rules.string import String
from piracyshield_component.validation.rules.length import Length
from piracyshield_component.validation.rules.email import Email
class AccountRule:
"""
Set of rules for new account generation.
"""
ACCOUNT_ID = [
Required(),
String(),
Length(minimum = 32, maximum = 32)
]
NAME = [
Required(),
String(allowed = ' _-.'),
Length(minimum = 3, maximum = 255)
]
EMAIL = [
Required(),
String(allowed = '@._-'),
Email()
]
PASSWORD = [
Required(),
String(allowed = '-_%~&@'),
Length(minimum = 8, maximum = 32)
]
ROLE = [
Required(),
Length(minimum = 3, maximum = 3)
]

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,77 @@
from piracyshield_data_model.base import BaseModel
from piracyshield_component.validation.validator import Validator
from piracyshield_data_model.authentication.rule import AuthenticationRule
class AuthenticationModel(BaseModel):
"""
Authentication data modeling class.
"""
email = None
password = None
def __init__(self, email: str, password: str):
"""
Validates the parameters.
:param email: e-mail of the account.
:param password: a valid password.
"""
self.email = self._validate_email(email)
self.password = self._validate_password(password)
def _validate_email(self, value: str) -> str | Exception:
"""
Validates the account email.
:param value: a valid string based on the required rules.
:return: the same value.
"""
validator = Validator(value, AuthenticationRule.EMAIL)
validator.validate()
if not validator.is_valid():
raise AuthenticationModelEmailException(validator.errors)
return value
def _validate_password(self, value: str) -> str | Exception:
"""
Validates the account password.
:param value: a valid string based on the required rules.
:return: the same value.
"""
validator = Validator(value, AuthenticationRule.PASSWORD)
validator.validate()
if not validator.is_valid():
raise AuthenticationModelPasswordException(validator.errors)
return value
class AuthenticationModelEmailException(Exception):
"""
Non valid e-mail address.
"""
pass
class AuthenticationModelPasswordException(Exception):
"""
Non valid password.
"""
pass

View file

@ -0,0 +1,22 @@
from piracyshield_component.validation.rules.required import Required
from piracyshield_component.validation.rules.string import String
from piracyshield_component.validation.rules.length import Length
from piracyshield_component.validation.rules.email import Email
class AuthenticationRule:
"""
Set of rules for user authentication.
"""
EMAIL = [
Required(),
String('@._-'),
Email()
]
PASSWORD = [
Required(),
String(allowed = '-_%~&@'),
Length(minimum = 8, maximum = 32)
]

View file

@ -0,0 +1,19 @@
class BaseModel:
"""
Base data modeling class with utilities.
"""
def to_dict(self) -> dict:
"""
Exports the set data into a dictionary.
"""
output = {}
for key, value in self.__dict__.items():
if value is not None:
output[key] = value
return output

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,177 @@
from piracyshield_data_model.base import BaseModel
from piracyshield_component.validation.validator import Validator
from piracyshield_data_model.dda.rule import DDARule
from piracyshield_data_model.account.rule import AccountRule
class DDAModel(BaseModel):
"""
DDA data modeling class.
"""
dda_id = None
description = None
instance = None
account_id = None
is_active = True
def __init__(self, dda_id: str, description: str, instance: str, account_id: str, is_active: bool):
"""
Validates the parameters.
:param dda_id: a valid DDA identifier.
:param description: textual description.
:param instance: DDA instance.
:param account_id: reporter account assigned to this DDA.
:param is_active: enable/disable this item.
"""
self.dda_id = self._validate_dda_id(dda_id)
self.description = self._validate_description(description)
self.instance = self._validate_instance(instance)
self.account_id = self._validate_account_id(account_id)
self.is_active = is_active
def _validate_dda_id(self, value: str) -> str | Exception:
"""
Validates the DDA identifier.
:param value: a valid DDA identifier string.
:return: the same value.
"""
validator = Validator(value, DDARule.DDA_ID)
validator.validate()
if not validator.is_valid():
raise DDAModelDDAIdException(validator.errors)
return value
def _validate_description(self, value: str) -> str | Exception:
"""
Validates description.
:param value: a valid string.
:return: the same value.
"""
if not value or not len(value):
raise DDAModelDescriptionMissingException()
validator = Validator(value, DDARule.DESCRIPTION)
validator.validate()
if not validator.is_valid():
raise DDAModelDescriptionNonValidException(validator.errors)
return value
def _validate_instance(self, value: str) -> str | Exception:
"""
Validates DDA instance.
:param value: the DDA instance.
:return: the same value.
"""
if not value or not len(value):
raise DDAModelInstanceMissingException()
validator = Validator(value, DDARule.INSTANCE)
validator.validate()
if not validator.is_valid():
raise DDAModelInstanceNonValidException(validator.errors)
return value
def _validate_account_id(self, value: str) -> str | Exception:
"""
Validates account identifier.
:param value: the account identifier.
:return: the same value.
"""
if not value or not len(value):
raise DDAModelAccountIdMissingException()
validator = Validator(value, AccountRule.ACCOUNT_ID)
validator.validate()
if not validator.is_valid():
raise DDAModelAccountIdNonValidException(validator.errors)
return value
class DDAModelDDAIdException(Exception):
"""
Non valid DDA identifier.
"""
pass
class DDAModelDescriptionMissingException(Exception):
"""
No description passed.
"""
pass
class DDAModelDescriptionNonValidException(Exception):
"""
Non valid description.
"""
pass
class DDAModelInstanceMissingException(Exception):
"""
No DDA instance passed.
"""
pass
class DDAModelInstanceNonValidException(Exception):
"""
Non valid DDA instance.
"""
pass
class DDAModelAccountIdMissingException(Exception):
"""
Non account identifier passed.
"""
pass
class DDAModelAccountIdNonValidException(Exception):
"""
Non valid account identifier.
"""
pass

View file

@ -0,0 +1,27 @@
from piracyshield_component.validation.rules.required import Required
from piracyshield_component.validation.rules.string import String
from piracyshield_component.validation.rules.length import Length
from piracyshield_component.validation.rules.dda import DDA
class DDARule:
"""
Set of rules for a DDA item.
"""
DDA_ID = [
Required(),
String(),
Length(minimum = 32, maximum = 32)
]
DESCRIPTION = [
Required(),
String(' .,-_@'),
Length(minimum = 3, maximum = 255)
]
INSTANCE = [
Required(),
DDA()
]

View file

@ -0,0 +1 @@

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,87 @@
from piracyshield_data_model.base import BaseModel
from piracyshield_component.validation.validator import Validator
from piracyshield_data_model.forensic.status.model import ForensicArchiveStatusModel
from piracyshield_data_model.ticket.rule import TicketRule
from piracyshield_data_model.forensic.archive.rule import ForensicArchiveRule
class ForensicArchiveModel(BaseModel):
"""
Forensic evidence data modeling class.
This class refers to the physical forensic evidence archive.
"""
ticket_id = None
name = None
status = None
def __init__(self, ticket_id: str, name: str):
"""
Validates the parameters.
:param ticket_id: a valid ticket identifier.
:param archive_name: the name of the archive.
"""
self.ticket_id = self._validate_ticket_id(ticket_id)
self.name = self._validate_name(name)
self.status = ForensicArchiveStatusModel.SCHEDULED.value
def _validate_ticket_id(self, value: str) -> str | Exception:
"""
Validates the ticket identifier.
:param value: a valid ticket identifier string.
:return: the same value.
"""
validator = Validator(value, TicketRule.TICKET_ID)
validator.validate()
if not validator.is_valid():
raise ForensicModelForensicIDException(validator.errors)
return value
def _validate_name(self, value: str) -> str | Exception:
"""
Validates the ticket identifier.
:param archive_name: a valid string containing only the name of the archive.
:return: the same value.
"""
validator = Validator(value, ForensicArchiveRule.NAME)
validator.validate()
if not validator.is_valid():
raise ForensicArchiveModelNameException(validator.errors)
return value
class ForensicArchiveModelTicketIDException(Exception):
"""
Non valid ticket identifier.
"""
pass
class ForensicArchiveModelNameException(Exception):
"""
Non valid archive name.
"""
pass

View file

@ -0,0 +1,15 @@
from piracyshield_component.validation.rules.required import Required
from piracyshield_component.validation.rules.string import String
from piracyshield_component.validation.rules.length import Length
class ForensicArchiveRule:
"""
Set of rules for forensic archives.
"""
NAME = [
Required(),
String(allowed = '.-_'),
Length(minimum = 6, maximum = 128)
]

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,13 @@
class ForensicFormatsModel:
"""
Forensic archive formats.
"""
ZIP = 'zip'
RAR = 'rar'
def get_formats(self):
return [attr for attr in dir(self) if attr.isupper()]

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,98 @@
from piracyshield_data_model.base import BaseModel
from piracyshield_component.validation.validator import Validator
from piracyshield_data_model.forensic.status.model import ForensicArchiveStatusModel
from piracyshield_data_model.forensic.hash.rule import ForensicHashRule
class ForensicHashModel(BaseModel):
"""
Hash data modeling class.
This is an extension to validate forensic evidences hashes.
"""
hash_string = None
hash_type = None
status = None
def __init__(self, hash_string: str, hash_type: str):
"""
Validates the parameters.
:param hash_string: a valid hash string based on its type.
:param hash_type: type of the hash.
"""
self.hash_type = self._validate_type(hash_type)
self.hash_string = self._validate_string(hash_string, self.hash_type)
self.status = ForensicArchiveStatusModel.PENDING.value
def _validate_type(self, hash_type: str) -> str | Exception:
"""
Validates the hash.
:param hash_type: type of the hash.
:return: the same value.
"""
# dirty hack to get the list of rules
hash_types = ForensicHashRule().get_hash_types()
hash_type = hash_type.upper()
if hash_type not in hash_types:
raise ForensicHashModelNotSupportedException()
return hash_type
def _validate_string(self, hash_string: str, hash_type: str) -> str | Exception:
"""
Validates the hash string.
:param hash_string: a valid hash string.
:param hash_type: type of the hash.
:return: the same value.
"""
if not len(hash_string):
raise ForensicHashModelHashMissingException()
validator = Validator(hash_string, getattr(ForensicHashRule, hash_type))
validator.validate()
if not validator.is_valid():
raise ForensicHashModelNonValidException(validator.errors)
return hash_string
class ForensicHashModelStringMissingException(Exception):
"""
Hash string is missing.
"""
pass
class ForensicHashModelNotSupportedException(Exception):
"""
Non supported hash type.
"""
pass
class ForensicHashModelNonValidException(Exception):
"""
Non valid hash string.
"""
pass

View file

@ -0,0 +1,42 @@
from piracyshield_component.validation.rules.required import Required
from piracyshield_component.validation.rules.string import String
from piracyshield_component.validation.rules.length import Length
class ForensicHashRule:
"""
Set of rules for hashes.
"""
SHA256 = [
Required(),
String(),
Length(minimum = 64, maximum = 64)
]
SHA384 = [
Required(),
String(),
Length(minimum = 96, maximum = 96)
]
SHA512 = [
Required(),
String(),
Length(minimum = 128, maximum = 128)
]
BLAKE2B = [
Required(),
String(),
Length(minimum = 128, maximum = 128) # accept only default digest size
]
BLAKE2S = [
Required(),
String(),
Length(minimum = 128, maximum = 128) # accept only default digest size
]
def get_hash_types(self):
return [attr for attr in dir(self) if attr.isupper()]

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,17 @@
from enum import Enum
class ForensicArchiveStatusModel(Enum):
"""
Forensic archive analysis status.
"""
PENDING = 'pending'
SCHEDULED = 'scheduled'
IN_PROGRESS = 'in progress'
APPROVED = 'approved'
REJECTED = 'rejected'

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,19 @@
from piracyshield_data_model.account.model import AccountModel
from piracyshield_data_model.account.role.model import AccountRoleModel
class GuestModel(AccountModel):
"""
Guest account data modeling class.
Currently not fully implemented.
"""
role = AccountRoleModel.GUEST
def __init__(self, account_id: str, name: str, email: str, password: str, confirm_password: str, is_active: bool):
"""
Extends the functionality from a general account.
"""
super().__init__(account_id, name, email, password, confirm_password, self.role, is_active)

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,17 @@
from piracyshield_data_model.account.model import AccountModel
from piracyshield_data_model.account.role.model import AccountRoleModel
class InternalModel(AccountModel):
"""
Internal account data modeling class.
"""
role = AccountRoleModel.INTERNAL
def __init__(self, account_id: str, name: str, email: str, password: str, confirm_password: str, is_active: bool):
"""
Extends the functionality from a general account.
"""
super().__init__(account_id, name, email, password, confirm_password, self.role, is_active)

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,20 @@
from piracyshield_component.validation.rules.required import Required
from piracyshield_component.validation.rules.string import String
from piracyshield_component.validation.rules.length import Length
class LogRule:
"""
Set of rules for log records.
"""
LOG_ID = [
Required(),
String(),
Length(minimum = 32, maximum = 32)
]
MESSAGE = [
Required(),
Length(minimum = 3, maximum = 500)
]

View file

@ -0,0 +1 @@

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,101 @@
from piracyshield_data_model.base import BaseModel
from piracyshield_component.validation.validator import Validator
from piracyshield_data_model.ticket.item.rule import TicketItemRule
from piracyshield_data_model.log.rule import LogRule
class LogTicketItemModel(BaseModel):
"""
Ticket item logging data modeling class.
"""
ticket_item_id = None
message = None
def __init__(self, ticket_item_id: str, message: str):
"""
Validates the parameters.
:param ticket_item_id: a valid ticket item identifier.
:param message: value of the item.
"""
self.ticket_item_id = self._validate_ticket_item_id(ticket_item_id)
self.message = self._validate_message(message)
def _validate_ticket_item_id(self, value: str) -> str | Exception:
"""
Validates the ticket item identifier.
:param value: a valid ticket item identifier string.
:return: the same value.
"""
if not value or not len(value):
raise LogTicketItemModelTicketItemIdentifierMissingException()
validator = Validator(value, TicketItemRule.TICKET_ITEM_ID)
validator.validate()
if not validator.is_valid():
raise LogTicketItemModelTicketItemIdentifierNonValidException(validator.errors)
return value
def _validate_message(self, value: str) -> str | Exception:
"""
Validates the message.
:param value: a valid message string.
:return: the same value.
"""
if not value or not len(value):
raise LogTicketItemModelMessageMissingException()
validator = Validator(value, LogRule.MESSAGE)
validator.validate()
if not validator.is_valid():
raise LogTicketItemModelMessageNonValidException(validator.errors)
return value
class LogTicketItemModelTicketItemIdentifierMissingException(Exception):
"""
Missing ticket item identifier.
"""
pass
class LogTicketItemModelTicketItemIdentifierNonValidException(Exception):
"""
Non valid ticket item identifier.
"""
pass
class LogTicketItemModelMessageMissingException(Exception):
"""
Missing message.
"""
pass
class LogTicketItemModelMessageNonValidException(Exception):
"""
Non valid message.
"""
pass

View file

@ -0,0 +1,95 @@
from piracyshield_data_model.base import BaseModel
from piracyshield_component.validation.validator import Validator
from piracyshield_data_model.ticket.rule import TicketRule
from piracyshield_data_model.log.rule import LogRule
class LogTicketModel(BaseModel):
"""
Ticket logging data modeling class.
"""
ticket_id = None
message = None
def __init__(self, ticket_id: str, message: str):
"""
Validates the parameters.
:param identifier: a valid identifier that represent the service object of the logging.
:param message: value of the item.
"""
self.ticket_id = self._validate_ticket_id(ticket_id)
self.message = self._validate_message(message)
def _validate_ticket_id(self, value: str) -> str | Exception:
"""
Validates the ticket identifier.
:param value: a valid ticket identifier string.
:return: the same value.
"""
validator = Validator(value, TicketRule.TICKET_ID)
validator.validate()
if not validator.is_valid():
raise LogTicketModelTicketIdentifierNonValidException(validator.errors)
return value
def _validate_message(self, value: str) -> str | Exception:
"""
Validates the message.
:param value: a valid message string.
:return: the same value.
"""
validator = Validator(value, LogRule.MESSAGE)
validator.validate()
if not validator.is_valid():
raise LogTicketModelMessageNonValidException(validator.errors)
return value
class LogTicketModelTicketIdentifierMissingException(Exception):
"""
Missing ticket identifier.
"""
pass
class LogTicketModelTicketIdentifierNonValidException(Exception):
"""
Non valid ticket identifier.
"""
pass
class LogTicketModelMessageMissingException(Exception):
"""
Missing message.
"""
pass
class LogTicketModelMessageNonValidException(Exception):
"""
Non valid message.
"""
pass

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,49 @@
from enum import IntEnum
class PermissionModel(IntEnum):
"""
Permission types with relative identifier.
"""
# account permissions
CREATE_ACCOUNT = 101
VIEW_ACCOUNT = 102
EDIT_ACCOUNT = 103
DELETE_ACCOUNT = 104
# ticket permissions
CREATE_TICKET = 201
VIEW_TICKET = 202
EDIT_TICKET = 203
DELETE_TICKET = 204
UPLOAD_TICKET = 205
# whitelist permissions
CREATE_WHITELIST_ITEM = 301
VIEW_WHITELIST_ITEM = 302
EDIT_WHITELIST_ITEM = 303
DELETE_WHITELIST_ITEM = 304
# DDA permissions
CREATE_DDA = 401
VIEW_DDA = 402
EDIT_DDA = 403
DELETE_DDA = 404

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,17 @@
from piracyshield_data_model.account.model import AccountModel
from piracyshield_data_model.account.role.model import AccountRoleModel
class ProviderModel(AccountModel):
"""
Provider account data modeling class.
"""
role = AccountRoleModel.PROVIDER
def __init__(self, account_id: str, name: str, email: str, password: str, confirm_password: str, is_active: bool):
"""
Extends the functionality from a general account.
"""
super().__init__(account_id, name, email, password, confirm_password, self.role, is_active)

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,17 @@
from piracyshield_data_model.account.model import AccountModel
from piracyshield_data_model.account.role.model import AccountRoleModel
class ReporterModel(AccountModel):
"""
Reporter account data modeling class.
"""
role = AccountRoleModel.REPORTER
def __init__(self, account_id: str, name: str, email: str, password: str, confirm_password: str, is_active: bool):
"""
Extends the functionality from a general account.
"""
super().__init__(account_id, name, email, password, confirm_password, self.role, is_active)

View file

@ -0,0 +1 @@

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,227 @@
from piracyshield_data_model.base import BaseModel
from piracyshield_component.validation.validator import Validator
from piracyshield_data_model.ticket.error.rule import TicketErrorRule
from piracyshield_data_model.ticket.genre.model import TicketGenreModel
from piracyshield_data_model.ticket.rule import TicketRule
class TicketErrorModel(BaseModel):
"""
Ticket error data modeling class.
"""
ticket_error_id = None
genre = None
ticket_id = None
fqdn = []
ipv4 = []
ipv6 = []
def __init__(self, ticket_error_id: str, ticket_id: str, fqdn: list, ipv4: list, ipv6: list):
"""
Validates the parameters.
:param ticket_error_id: a valid error ticket identifier.
:param ticket_id: a valid ticket identifier.
:param fqdn: a list of FQDN items.
:param ipv4: a list of IPv4 items.
:param ipv6: a list of IPv6 items.
"""
# FQDN, IPv4 and IPv6 should never be all empty
if not any([bool(fqdn), bool(ipv4), bool(ipv6)]):
raise TicketErrorModelNoDataException
self.ticket_error_id = self._validate_ticket_error_id(ticket_error_id)
# this is an error ticket
self.genre = TicketGenreModel.ERROR.value
self.ticket_id = self._validate_ticket_id(ticket_id)
if fqdn:
self.fqdn = self._validate_fqdn(fqdn)
if ipv4:
self.ipv4 = self._validate_ipv4(ipv4)
if ipv6:
self.ipv6 = self._validate_ipv6(ipv6)
def _validate_ticket_error_id(self, value: str) -> str | Exception:
"""
Validates the error ticket identifier.
:param value: a valid error ticket identifier string.
:return: the same value.
"""
validator = Validator(value, TicketErrorRule.TICKET_ERROR_ID)
validator.validate()
if not validator.is_valid():
raise TicketErrorModelTicketErrorIdentifierException(validator.errors)
return value
def _validate_ticket_id(self, value: str) -> str | Exception:
"""
Validates the ticket identifier.
:param value: a valid ticket identifier string.
:return: the same value.
"""
validator = Validator(value, TicketRule.TICKET_ID)
validator.validate()
if not validator.is_valid():
raise TicketErrorModelTicketIdentifierException(validator.errors)
return value
def _validate_fqdn(self, value: list) -> list | Exception:
"""
Validates the ticket FQDN list.
:param value: a list of FQDNs.
:return: the same value.
"""
if not value or not len(value):
raise TicketErrorModelFQDNMissingException()
for item in value:
validator = Validator(item, TicketRule.FQDN)
validator.validate()
if not validator.is_valid():
raise TicketErrorModelFQDNNonValidException(validator.errors)
return value
def _validate_ipv4(self, value: list) -> list | Exception:
"""
Validates the ticket IPv4 list.
:param value: a list of IPv4s.
:return: the same value.
"""
if not value or not len(value):
raise TicketErrorModelIPv4MissingException()
for item in value:
validator = Validator(item, TicketRule.IPV4)
validator.validate()
if not validator.is_valid():
raise TicketErrorModelIPv4NonValidException(validator.errors)
return value
def _validate_ipv6(self, value: list) -> list | Exception:
"""
Validates the ticket IPv6 list.
:param value: a list of IPv6s.
:return: the same value.
"""
if not value or not len(value):
raise TicketErrorModelIPv6MissingException()
for item in value:
validator = Validator(item, TicketRule.IPV6)
validator.validate()
if not validator.is_valid():
raise TicketErrorModelIPv6NonValidException(validator.errors)
return value
class TicketErrorModelNoDataException(Exception):
"""
No FQDN or IPv4 passed.
"""
pass
class TicketErrorModelTicketErrorIdentifierException(Exception):
"""
Non valid error ticket identifier.
"""
pass
class TicketErrorModelTicketIdentifierException(Exception):
"""
Non valid ticket identifier.
"""
pass
class TicketErrorModelFQDNMissingException(Exception):
"""
No FQDN passed.
"""
pass
class TicketErrorModelFQDNNonValidException(Exception):
"""
Non valid FQDN.
"""
pass
class TicketErrorModelIPv4MissingException(Exception):
"""
No IPv4 passed.
"""
pass
class TicketErrorModelIPv4NonValidException(Exception):
"""
Non valid IPv4.
"""
pass
class TicketErrorModelIPv6MissingException(Exception):
"""
No IPv6 passed.
"""
pass
class TicketErrorModelIPv6NonValidException(Exception):
"""
Non valid IPv6.
"""
pass

View file

@ -0,0 +1,18 @@
from piracyshield_component.validation.rules.required import Required
from piracyshield_component.validation.rules.string import String
from piracyshield_component.validation.rules.length import Length
from piracyshield_component.validation.rules.fqdn import FQDN
from piracyshield_component.validation.rules.ipv4 import IPv4
from piracyshield_component.validation.rules.ipv6 import IPv6
class TicketErrorRule:
"""
Set of rules for new ticket.
"""
TICKET_ERROR_ID = [
Required(),
String(),
Length(minimum = 32, maximum = 32)
]

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,11 @@
from enum import Enum
class TicketGenreModel(Enum):
"""
Ticket genre types.
"""
BLOCKING = 'blocking'
ERROR = 'error'

View file

@ -0,0 +1 @@

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,13 @@
from enum import Enum
class TicketItemGenreModel(Enum):
"""
Ticket item genre types with relative identifier.
"""
FQDN = 'fqdn'
IPV4 = 'ipv4'
IPV6 = 'ipv6'

View file

@ -0,0 +1,298 @@
from piracyshield_data_model.base import BaseModel
from piracyshield_component.validation.validator import Validator
from piracyshield_data_model.ticket.item.rule import TicketItemRule
from piracyshield_data_model.ticket.item.status.model import TicketItemStatusModel
from piracyshield_data_model.ticket.item.genre.model import TicketItemGenreModel
from piracyshield_data_model.ticket.rule import TicketRule
from piracyshield_data_model.account.rule import AccountRule
class TicketItemModel(BaseModel):
"""
Ticket item data modeling class.
"""
ticket_id = None
ticket_item_id = None
provider_id = None
value = None
genre = None
status = None
is_active = True
is_duplicate = False
is_whitelisted = False
is_error = False
settings = None
def __init__(self,
ticket_id: str,
ticket_item_id: str,
provider_id: str,
value: str,
genre: str,
is_active: bool,
is_duplicate: bool,
is_whitelisted: bool,
is_error: bool
):
"""
Validates the parameters.
:param ticket_id: a valid ticket identifier.
:param ticket_item_id: a valid ticket item identifier.
:param value: a valid FQDN or IPv4.
:param genre: a valid ticket item type.
:param provider_id: an account identifier assigned to the ticket item.
"""
self.ticket_id = self._validate_ticket_id(ticket_id)
self.ticket_item_id = self._validate_ticket_item_id(ticket_item_id)
match genre:
case TicketItemGenreModel.FQDN.value:
self.genre = TicketItemGenreModel.FQDN.value
self.value = self._validate_fqdn(value)
case TicketItemGenreModel.IPV4.value:
self.genre = TicketItemGenreModel.IPV4.value
self.value = self._validate_ipv4(value)
case TicketItemGenreModel.IPV6.value:
self.genre = TicketItemGenreModel.IPV6.value
self.value = self._validate_ipv6(value)
case _:
raise TicketItemModelGenreNonValidException()
self.provider_id = self._validate_provider_id(provider_id)
self.status = TicketItemStatusModel.PENDING.value
self.is_active = is_active
self.is_duplicate = is_duplicate
self.is_whitelisted = is_whitelisted
self.is_error = is_error
self.settings = {
'update_max_time': 172800 # 2 days
}
def _validate_ticket_id(self, value: str) -> str | Exception:
"""
Validates the ticket identifier.
:param value: a valid ticket identifier string.
:return: the same value.
"""
validator = Validator(value, TicketRule.TICKET_ID)
validator.validate()
if not validator.is_valid():
raise TicketItemModelTicketIdentifierNonValidException(validator.errors)
return value
def _validate_ticket_item_id(self, value: str) -> str | Exception:
"""
Validates the ticket item identifier.
:param value: a valid ticket item identifier string.
:return: the same value.
"""
validator = Validator(value, TicketItemRule.TICKET_ITEM_ID)
validator.validate()
if not validator.is_valid():
raise TicketItemModelTicketIdentifierNonValidException(validator.errors)
return value
def _validate_fqdn(self, value: str) -> list | Exception:
"""
Validates the FQDN genre.
:param value: a valid FQDN.
:return: the same value.
"""
if not value or not len(value):
raise TicketItemModelFQDNMissingException()
validator = Validator(value, TicketRule.FQDN)
validator.validate()
if not validator.is_valid():
raise TicketItemModelFQDNNonValidException(validator.errors)
return value
def _validate_ipv4(self, value: str) -> list | Exception:
"""
Validates the IPv4 genre.
:param value: a valid IPv4.
:return: the same value.
"""
if not value or not len(value):
raise TicketItemModelIPv4MissingException()
validator = Validator(value, TicketRule.IPV4)
validator.validate()
if not validator.is_valid():
raise TicketItemModelIPv4NonValidException(validator.errors)
return value
def _validate_ipv6(self, value: str) -> list | Exception:
"""
Validates the IPv6 genre.
:param value: a valid IPv6.
:return: the same value.
"""
if not value or not len(value):
raise TicketItemModelIPv6MissingException()
validator = Validator(value, TicketRule.IPV6)
validator.validate()
if not validator.is_valid():
raise TicketItemModelIPv6NonValidException(validator.errors)
return value
def _validate_provider_id(self, value: str) -> str | Exception:
"""
Validates the provider account identifier.
:param value: a valid account identifier.
:return: the same value.
"""
if not value or not len(value):
raise TicketItemModelProviderIdentifierMissingException()
validator = Validator(value, AccountRule.ACCOUNT_ID)
validator.validate()
if not validator.is_valid():
raise TicketItemModelProviderIdentifierException(validator.errors)
return value
class TicketItemModelTicketIdentifierNonValidException(Exception):
"""
Non valid ticket identifier.
"""
pass
class TicketItemModelTicketItemIdentifierNonValidException(Exception):
"""
Non valid ticket item identifier.
"""
pass
class TicketItemModelGenreNonValidException(Exception):
"""
Non valid genre.
"""
pass
class TicketItemModelFQDNMissingException(Exception):
"""
Missing FQDN.
"""
pass
class TicketItemModelFQDNNonValidException(Exception):
"""
Non valid FQDN.
"""
pass
class TicketItemModelIPv4MissingException(Exception):
"""
Missing IPv4.
"""
pass
class TicketItemModelIPv4NonValidException(Exception):
"""
Non valid IPv4.
"""
pass
class TicketItemModelIPv6MissingException(Exception):
"""
Missing IPv6.
"""
pass
class TicketItemModelIPv6NonValidException(Exception):
"""
Non valid IPv6.
"""
pass
class TicketItemModelProviderIdentifierMissingException(Exception):
"""
Missing provider account identifier.
"""
pass
class TicketItemModelProviderIdentifierNonValidException(Exception):
"""
Non valid provider account idenfitier.
"""
pass

View file

@ -0,0 +1,229 @@
from piracyshield_data_model.base import BaseModel
from piracyshield_component.validation.validator import Validator
from piracyshield_component.utils.time import Time, TimeFormatException
from piracyshield_data_model.ticket.item.processed.rule import TicketItemProcessedRule
from piracyshield_data_model.ticket.item.status.model import TicketItemStatusModel
from piracyshield_data_model.ticket.rule import TicketRule
from piracyshield_data_model.account.rule import AccountRule
class TicketItemProcessedModel(BaseModel):
"""
Ticket item processed data modeling class.
"""
provider_id = None
value = None
status = None
timestamp = None
note = None
def __init__(self, provider_id: str, value: str, timestamp: str = None, note: str = None):
"""
Validates the parameters.
:param provider_id: an account identifier assigned to the ticket item.
:param value: a valid FQDN, IPv4 or IPv6.
:param timestamp: an optional ISO8601 date.
:param note: an optional string.
"""
self.provider_id = self._validate_provider_id(provider_id)
self.value = self._validate_value(value)
self.status = TicketItemStatusModel.PROCESSED.value
if timestamp:
self.timestamp = self._validate_timestamp(timestamp)
if note:
self.note = self._validate_note(note)
def _validate_provider_id(self, value: str) -> str | Exception:
"""
Validates the provider account identifier.
:param value: a valid account identifier.
:return: the same value.
"""
if not value or not len(value):
raise TicketItemProcessedModelProviderIdentifierMissingException()
validator = Validator(value, AccountRule.ACCOUNT_ID)
validator.validate()
if not validator.is_valid():
raise TicketItemProcessedModelProviderIdentifierException(validator.errors)
return value
def _validate_value(self, value: str) -> str | Exception:
"""
Validates the ticket item value.
:param value: a valid FQDN, IPv4 or IPv6.
:return: the same value.
"""
if not value or not len(value):
raise TicketItemProcessedModelValueMissingException()
validators = [
self._validate_fqdn,
self._validate_ipv4,
self._validate_ipv6
]
is_valid = False
for validator in validators:
if validator(value):
is_valid = True
if not is_valid:
raise TicketItemProcessedModelValueNonValidException()
return value
def _validate_fqdn(self, value: str) -> list | Exception:
"""
Validates the FQDN genre.
:param value: a valid FQDN.
:return: the same value.
"""
validator = Validator(value, TicketRule.FQDN)
validator.validate()
if not validator.is_valid():
return False
return value
def _validate_ipv4(self, value: str) -> list | Exception:
"""
Validates the IPv4 genre.
:param value: a valid IPv4.
:return: the same value.
"""
validator = Validator(value, TicketRule.IPV4)
validator.validate()
if not validator.is_valid():
return False
return value
def _validate_ipv6(self, value: str) -> list | Exception:
"""
Validates the IPv6 genre.
:param value: a valid IPv6.
:return: the same value.
"""
validator = Validator(value, TicketRule.IPV6)
validator.validate()
if not validator.is_valid():
return False
return value
def _validate_timestamp(self, value: str) -> list | Exception:
"""
Validates the timestamp against the ISO8601 format.
:param value: a valid ISO8601 date.
:return: the same value.
"""
try:
Time.is_valid_iso8601(value)
return value
except TimeFormatException:
raise TicketItemProcessedModelTimestampNonValidException()
def _validate_note(self, value: str) -> str | Exception:
"""
Validates the note text.
:param value: a string.
:return: the same value.
"""
validator = Validator(value, TicketItemProcessedRule.NOTE)
validator.validate()
if not validator.is_valid():
raise TicketItemProcessedModelNoteNonValidException(validator.errors)
return value
class TicketItemProcessedModelProviderIdentifierMissingException(Exception):
"""
Missing provider account identifier.
"""
pass
class TicketItemProcessedModelProviderIdentifierNonValidException(Exception):
"""
Non valid provider account idenfitier.
"""
pass
class TicketItemProcessedModelValueMissingException(Exception):
"""
Missing ticket item value.
"""
pass
class TicketItemProcessedModelValueNonValidException(Exception):
"""
Non valid ticket item value.
"""
pass
class TicketItemProcessedModelTimestampNonValidException(Exception):
"""
Non valid timestamp.
"""
pass
class TicketItemProcessedModelNoteNonValidException(Exception):
"""
Non valid note.
"""
pass

View file

@ -0,0 +1,15 @@
from piracyshield_component.validation.rules.required import Required
from piracyshield_component.validation.rules.string import String
from piracyshield_component.validation.rules.length import Length
class TicketItemProcessedRule:
"""
Set of rules for processed ticket item status.
"""
NOTE = [
Required(),
String(' .,-&/$€@"'),
Length(minimum = 3, maximum = 512)
]

View file

@ -0,0 +1,17 @@
from piracyshield_component.validation.rules.required import Required
from piracyshield_component.validation.rules.string import String
from piracyshield_component.validation.rules.length import Length
from piracyshield_component.validation.rules.fqdn import FQDN
from piracyshield_component.validation.rules.ipv4 import IPv4
class TicketItemRule:
"""
Set of rules for new ticket item.
"""
TICKET_ITEM_ID = [
Required(),
String(),
Length(minimum = 32, maximum = 32)
]

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,15 @@
from enum import Enum
class TicketItemStatusModel(Enum):
"""
Ticket item status types with relative identifier.
"""
PENDING = 'pending'
PROCESSED = 'processed'
UNPROCESSED = 'unprocessed'
UNBLOCKED = 'unblocked'

View file

@ -0,0 +1,271 @@
from piracyshield_data_model.base import BaseModel
from piracyshield_component.validation.validator import Validator
from piracyshield_component.utils.time import Time, TimeFormatException
from piracyshield_data_model.ticket.item.unprocessed.reason.model import TicketItemUnprocessedReasonModel
from piracyshield_data_model.ticket.item.unprocessed.rule import TicketItemUnprocessedRule
from piracyshield_data_model.ticket.item.status.model import TicketItemStatusModel
from piracyshield_data_model.ticket.rule import TicketRule
from piracyshield_data_model.account.rule import AccountRule
class TicketItemUnprocessedModel(BaseModel):
"""
Ticket item unprocessed data modeling class.
"""
provider_id = None
value = None
status = None
reason = None
timestamp = None
note = None
def __init__(self, provider_id: str, value: str, reason: str, timestamp: str = None, note: str = None):
"""
Validates the parameters.
:param provider_id: an account identifier assigned to the ticket item.
:param value: a valid FQDN, IPv4 or IPv6.
:param reason: a predefined reason for the unprocessed status.
:param timestamp: an optional ISO8601 date.
:param note: an optional string.
"""
self.provider_id = self._validate_provider_id(provider_id)
self.value = self._validate_value(value)
self.status = TicketItemStatusModel.UNPROCESSED.value
self.reason = self._validate_reason(reason)
if timestamp:
self.timestamp = self._validate_timestamp(timestamp)
if note:
self.note = self._validate_note(note)
def _validate_provider_id(self, value: str) -> str | Exception:
"""
Validates the provider account identifier.
:param value: a valid account identifier.
:return: the same value.
"""
if not value or not len(value):
raise TicketItemUnprocessedModelProviderIdentifierMissingException()
validator = Validator(value, AccountRule.ACCOUNT_ID)
validator.validate()
if not validator.is_valid():
raise TicketItemUnprocessedModelProviderIdentifierException(validator.errors)
return value
def _validate_value(self, value: str) -> str | Exception:
"""
Validates the ticket item value.
:param value: a valid FQDN, IPv4 or IPv6.
:return: the same value.
"""
if not value or not len(value):
raise TicketItemUnprocessedModelValueMissingException()
validators = [
self._validate_fqdn,
self._validate_ipv4,
self._validate_ipv6
]
is_valid = False
for validator in validators:
if validator(value):
is_valid = True
if not is_valid:
raise TicketItemUnprocessedModelValueNonValidException()
return value
def _validate_fqdn(self, value: str) -> list | Exception:
"""
Validates the FQDN genre.
:param value: a valid FQDN.
:return: the same value.
"""
validator = Validator(value, TicketRule.FQDN)
validator.validate()
if not validator.is_valid():
return False
return value
def _validate_ipv4(self, value: str) -> list | Exception:
"""
Validates the IPv4 genre.
:param value: a valid IPv4.
:return: the same value.
"""
validator = Validator(value, TicketRule.IPV4)
validator.validate()
if not validator.is_valid():
return False
return value
def _validate_ipv6(self, value: str) -> list | Exception:
"""
Validates the IPv6 genre.
:param value: a valid IPv6.
:return: the same value.
"""
validator = Validator(value, TicketRule.IPV6)
validator.validate()
if not validator.is_valid():
return False
return value
def _validate_reason(self, value: str) -> int | Exception:
"""
Validates the reason.
:param value: a valid predefined reason.
:return: the same value.
"""
if not value or not len(value):
raise TicketItemUnprocessedModelReasonMissingException()
try:
enum = TicketItemUnprocessedReasonModel(value)
return enum.value
except ValueError:
raise TicketItemUnprocessedModelReasonNonValidException()
def _validate_timestamp(self, value: str) -> list | Exception:
"""
Validates the timestamp against the ISO8601 format.
:param value: a valid ISO8601 date.
:return: the same value.
"""
try:
Time.is_valid_iso8601(value)
return value
except TimeFormatException:
raise TicketItemUnprocessedModelTimestampNonValidException()
def _validate_note(self, value: str) -> str | Exception:
"""
Validates the note text.
:param value: a string.
:return: the same value.
"""
validator = Validator(value, TicketItemUnprocessedRule.NOTE)
validator.validate()
if not validator.is_valid():
raise TicketItemUnprocessedModelNoteNonValidException(validator.errors)
return value
class TicketItemUnprocessedModelProviderIdentifierMissingException(Exception):
"""
Missing provider account identifier.
"""
pass
class TicketItemUnprocessedModelProviderIdentifierNonValidException(Exception):
"""
Non valid provider account idenfitier.
"""
pass
class TicketItemUnprocessedModelValueMissingException(Exception):
"""
Missing ticket item value.
"""
pass
class TicketItemUnprocessedModelValueNonValidException(Exception):
"""
Non valid ticket item value.
"""
pass
class TicketItemUnprocessedModelReasonMissingException(Exception):
"""
Missing reason.
"""
pass
class TicketItemUnprocessedModelReasonNonValidException(Exception):
"""
Non valid reason.
"""
pass
class TicketItemUnprocessedModelTimestampNonValidException(Exception):
"""
Non valid timestamp.
"""
pass
class TicketItemUnprocessedModelNoteNonValidException(Exception):
"""
Non valid note.
"""
pass

View file

@ -0,0 +1,13 @@
from enum import Enum
class TicketItemUnprocessedReasonModel(Enum):
"""
Ticket item unprocessed status reason.
"""
UNPROCESSED_ALREADY_BLOCKED = 'ALREADY_BLOCKED'
UNPROCESSED_UNDEFINED = 'UNDEFINED'
UNPROCESSED_UNKNOWN = 'UNKNOWN'

View file

@ -0,0 +1,15 @@
from piracyshield_component.validation.rules.required import Required
from piracyshield_component.validation.rules.string import String
from piracyshield_component.validation.rules.length import Length
class TicketItemUnprocessedRule:
"""
Set of rules for unprocessed ticket item status.
"""
NOTE = [
Required(),
String(' .,-&/$€@"'),
Length(minimum = 3, maximum = 512)
]

View file

@ -0,0 +1,315 @@
from piracyshield_data_model.base import BaseModel
from piracyshield_component.validation.validator import Validator
from piracyshield_data_model.ticket.status.model import TicketStatusModel
from piracyshield_data_model.ticket.genre.model import TicketGenreModel
from piracyshield_data_model.account.rule import AccountRule
from piracyshield_data_model.dda.rule import DDARule
from piracyshield_data_model.ticket.rule import TicketRule
class TicketModel(BaseModel):
"""
Ticket data modeling class.
"""
ticket_id = None
genre = None
description = None
fqdn = []
ipv4 = []
ipv6 = []
assigned_to = []
status = None
settings = None
def __init__(self, ticket_id: str, dda_id: str, fqdn: list, ipv4: list, ipv6: list, assigned_to: list, description: str = None):
"""
Validates the parameters.
:param ticket_id: a valid ticket identifier.
:param dda_id: a valid DDA identifier.
:param fqdn: a list of FQDN items.
:param ipv4: a list of IPv4 items.
:param ipv6: a list of IPv6 items.
:param assigned_to: a list of account identifiers assigned to the ticket.
:param description: a generic, non mandatory, description of the ticket.
"""
# FQDN, IPv4 and IPv6 should never be all empty
if not any([bool(fqdn), bool(ipv4), bool(ipv6)]):
raise TicketModelNoDataException
self.ticket_id = self._validate_ticket_id(ticket_id)
self.dda_id = self._validate_dda_id(dda_id)
if description:
self.description = self._validate_description(description)
if fqdn:
self.fqdn = self._validate_fqdn(fqdn)
if ipv4:
self.ipv4 = self._validate_ipv4(ipv4)
if ipv6:
self.ipv6 = self._validate_ipv6(ipv6)
self.assigned_to = self._validate_assigned_to(assigned_to) if assigned_to else None
# ticket initial status
self.status = TicketStatusModel.CREATED.value
# this is a blocking ticket
self.genre = TicketGenreModel.BLOCKING.value
self.settings = {
'autoclose_time': 1875, # 31.25 minutes in seconds, considering 75 seconds of preamble
'revoke_time': 75,
'report_error_time': 86400 # 1 day
}
def _validate_ticket_id(self, value: str) -> str | Exception:
"""
Validates the ticket identifier.
:param value: a valid ticket identifier string.
:return: the same value.
"""
validator = Validator(value, TicketRule.TICKET_ID)
validator.validate()
if not validator.is_valid():
raise TicketModelTicketIdException(validator.errors)
return value
def _validate_dda_id(self, value: str) -> str | Exception:
"""
Validates the DDA identifier.
:param value: a valid DDA identifier string.
:return: the same value.
"""
if not value or not len(value):
raise TicketModelDDAIdMissingException()
validator = Validator(value, DDARule.DDA_ID)
validator.validate()
if not validator.is_valid():
raise TicketModelDDAIdNonValidException(validator.errors)
return value
def _validate_description(self, value: str) -> str | Exception:
"""
Validates the ticket description.
:param value: a valid string.
:return: the same value.
"""
validator = Validator(value, TicketRule.DESCRIPTION)
validator.validate()
if not validator.is_valid():
raise TicketModelDescriptionException(validator.errors)
return value
def _validate_fqdn(self, value: list) -> list | Exception:
"""
Validates the ticket FQDN list.
:param value: a list of FQDNs.
:return: the same value.
"""
if not value or not len(value):
raise TicketModelFQDNMissingException()
for item in value:
validator = Validator(item, TicketRule.FQDN)
validator.validate()
if not validator.is_valid():
raise TicketModelFQDNNonValidException(validator.errors)
return value
def _validate_ipv4(self, value: list) -> list | Exception:
"""
Validates the ticket IPv4 list.
:param value: a list of IPv4s.
:return: the same value.
"""
if not value or not len(value):
raise TicketModelIPv4MissingException()
for item in value:
validator = Validator(item, TicketRule.IPV4)
validator.validate()
if not validator.is_valid():
raise TicketModelIPv4NonValidException(validator.errors)
return value
def _validate_ipv6(self, value: list) -> list | Exception:
"""
Validates the ticket IPv6 list.
:param value: a list of IPv6s.
:return: the same value.
"""
if not value or not len(value):
raise TicketModelIPv6MissingException()
for item in value:
validator = Validator(item, TicketRule.IPV6)
validator.validate()
if not validator.is_valid():
raise TicketModelIPv6NonValidException(validator.errors)
return value
def _validate_assigned_to(self, value: list) -> list | Exception:
"""
Validates the ticket assigned accounts list.
:param value: a list of account identifiers.
:return: the same value.
"""
if not value or not len(value):
validator = Validator(item, AccountRule.ACCOUNT_ID)
validator.validate()
if not validator.is_valid():
raise TicketModelAssignedToNonValidException(validator.errors)
return value
class TicketModelNoDataException(Exception):
"""
No FQDN or IPv4 passed.
"""
pass
class TicketModelTicketIdException(Exception):
"""
Non valid ticket identifier.
"""
pass
class TicketModelDDAIdMissingException(Exception):
"""
Missing DDA identifier.
"""
pass
class TicketModelDDAIdNonValidException(Exception):
"""
Non valid DDA identifier.
"""
pass
class TicketModelDescriptionException(Exception):
"""
Non valid description.
"""
pass
class TicketModelFQDNMissingException(Exception):
"""
No FQDN passed.
"""
pass
class TicketModelFQDNNonValidException(Exception):
"""
Non valid FQDN.
"""
pass
class TicketModelIPv4MissingException(Exception):
"""
No IPv4 passed.
"""
pass
class TicketModelIPv4NonValidException(Exception):
"""
Non valid IPv4.
"""
pass
class TicketModelIPv6MissingException(Exception):
"""
No IPv6 passed.
"""
pass
class TicketModelIPv6NonValidException(Exception):
"""
Non valid IPv6.
"""
pass
class TicketModelAssignedToNonValidException(Exception):
"""
Non valid account id passed.
"""
pass

View file

@ -0,0 +1,39 @@
from piracyshield_component.validation.rules.required import Required
from piracyshield_component.validation.rules.string import String
from piracyshield_component.validation.rules.length import Length
from piracyshield_component.validation.rules.fqdn import FQDN
from piracyshield_component.validation.rules.ipv4 import IPv4
from piracyshield_component.validation.rules.ipv6 import IPv6
class TicketRule:
"""
Set of rules for new ticket.
"""
TICKET_ID = [
Required(),
String(),
Length(minimum = 32, maximum = 32)
]
DESCRIPTION = [
Required(),
String(' .,-_@'),
Length(minimum = 3, maximum = 255)
]
FQDN = [
Required(),
FQDN()
]
IPV4 = [
Required(),
IPv4()
]
IPV6 = [
Required(),
IPv6()
]

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,13 @@
from enum import Enum
class TicketStatusModel(Enum):
"""
Ticket status types with relative identifier.
"""
CREATED = 'created'
OPEN = 'open'
CLOSED = 'closed'

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,243 @@
from piracyshield_data_model.base import BaseModel
from piracyshield_component.validation.validator import Validator
from piracyshield_data_model.ticket.item.genre.model import TicketItemGenreModel
from piracyshield_data_model.whitelist.rule import WhitelistRule
from piracyshield_data_model.ticket.rule import TicketRule
class WhitelistModel(BaseModel):
"""
Whitelist data modeling class.
"""
genre = None
value = None
registrar = None
as_code = None
is_active = True
def __init__(self, genre: str, value: str, is_active: bool, registrar: str = None, as_code: str = None):
"""
Validates the parameters.
:param genre: FQDN, IPv4 or IPv6 type.
:param value: value of the item.
:param is_active: if the item is already active or not.
:param registrar: registrar of the FQDN item.
:param as_code: AS code of the IPv4 or IPv6 item.
"""
match genre:
case TicketItemGenreModel.FQDN.value:
self.value = self._validate_fqdn(value)
self.registrar = self._validate_registrar(registrar)
case TicketItemGenreModel.IPV4.value:
self.value = self._validate_ipv4(value)
self.as_code = self._validate_as_code(as_code)
case TicketItemGenreModel.IPV6.value:
self.value = self._validate_ipv6(value)
self.as_code = self._validate_as_code(as_code)
case _:
raise WhitelistModelGenreException()
self.genre = genre
self.is_active = is_active
def _validate_fqdn(self, value: str) -> str | Exception:
"""
Validates the FQDN.
:param value: a valid FQDN string.
:return: the same value.
"""
if not value or not len(value):
raise WhitelistModelFQDNMissingException()
validator = Validator(value, TicketRule.FQDN)
validator.validate()
if not validator.is_valid():
raise WhitelistModelFQDNNonValidException(validator.errors)
return value
def _validate_ipv4(self, value: str) -> str | Exception:
"""
Validates the IPv4.
:param value: a valid IPv4 string.
:return: the same value.
"""
if not value or not len(value):
raise WhitelistModelIPv4MissingException()
validator = Validator(value, TicketRule.IPV4)
validator.validate()
if not validator.is_valid():
raise WhitelistModelIPv4NonValidException(validator.errors)
return value
def _validate_ipv6(self, value: str) -> str | Exception:
"""
Validates the IPv6.
:param value: a valid IPv6 string.
:return: the same value.
"""
if not value or not len(value):
raise WhitelistModelIPv6MissingException()
validator = Validator(value, TicketRule.IPV6)
validator.validate()
if not validator.is_valid():
raise WhitelistModelIPv6NonValidException(validator.errors)
return value
def _validate_registrar(self, value: str) -> str | Exception:
"""
Validates the registrar of a FQDN item.
:param value: a valid string.
:return: the same value.
"""
if not value or not len(value):
raise WhitelistModelRegistrarMissingException()
validator = Validator(value, WhitelistRule.REGISTRAR)
validator.validate()
if not validator.is_valid():
raise WhitelistModelRegistrarNonValidException(validator.errors)
return value
def _validate_as_code(self, value: str) -> str | Exception:
"""
Validates the AS code of an IPv4 or IPv6 item.
:param value: a valid AS code.
:return: the same value.
"""
if not value or not len(value):
raise WhitelistModelASCodeMissingException()
validator = Validator(value, WhitelistRule.AS_CODE)
validator.validate()
if not validator.is_valid():
raise WhitelistModelASCodeNonValidException(validator.errors)
return value
class WhitelistModelRegistrarMissingException(Exception):
"""
Missing registrar.
"""
pass
class WhitelistModelASCodeMissingException(Exception):
"""
Missing AS code.
"""
pass
class WhitelistModelGenreException(Exception):
"""
Non valid genre.
"""
pass
class WhitelistModelFQDNMissingException(Exception):
"""
No FQDN passed.
"""
pass
class WhitelistModelFQDNNonValidException(Exception):
"""
Non valid FQDN.
"""
pass
class WhitelistModelIPv4MissingException(Exception):
"""
No IPv4 passed.
"""
pass
class WhitelistModelIPv4NonValidException(Exception):
"""
Non valid IPv4.
"""
pass
class WhitelistModelIPv6MissingException(Exception):
"""
No IPv6 passed.
"""
pass
class WhitelistModelIPv6NonValidException(Exception):
"""
Non valid IPv6.
"""
pass
class WhitelistModelRegistrarNonValidException(Exception):
"""
Non valid registrar.
"""
pass
class WhitelistModelASCodeNonValidException(Exception):
"""
Non valid AS code.
"""
pass

View file

@ -0,0 +1,33 @@
from piracyshield_component.validation.rules.required import Required
from piracyshield_component.validation.rules.string import String
from piracyshield_component.validation.rules.length import Length
from piracyshield_component.validation.rules.as_code import ASCode
from piracyshield_component.validation.rules.cidr_syntax_ipv4 import CIDRSyntaxIPv4
from piracyshield_component.validation.rules.cidr_syntax_ipv6 import CIDRSyntaxIPv6
class WhitelistRule:
"""
Set of rules for new whitelist items.
"""
REGISTRAR = [
Required(),
String(allowed = ' -'),
Length(minimum = 3, maximum = 255)
]
AS_CODE = [
Required(),
ASCode()
]
CIDR_IPV4 = [
Required(),
CIDRSyntaxIPv4()
]
CIDR_IPV6 = [
Required(),
CIDRSyntaxIPv6()
]