writing.sid_group
writing.sid_group
See the examples under https://github.com/salesforce/policy_sentry/tree/master/examples/library-usage/writing.
readthedocs is not building this for whatever reason. Disabling temporarily
sid_group indicates that this is a collection of policy-related data organized by their SIDs
SidGroup
This class is critical to the creation of least privilege policies.
It uses the SIDs as namespaces. The namespaces follow this format:
{Servicename}{Accesslevel}{Resourcetypename}
So, a resulting statement's SID might look like 'S3ListBucket'
If a condition key is supplied (like s3:RequestJob
), the SID string will be significantly longer.
It will resemble this format:
{Servicename}{Accesslevel}{Resourcetypename}{Conditionkeystring}{Conditiontypestring}{Conditionkeyvalue}
For example: EC2 write actions on the security-group resource, using the following condition map:
"Condition": {
"StringEquals": {"ec2:ResourceTag/Owner": "${aws:username}"}
}
The resulting SID would be:
Ec2WriteSecuritygroupResourcetagownerStringequalsAwsusername
Or, for actions that support wildcard ARNs only, an example could be:
Ec2WriteMultResourcetagownerStringequalsAwsusername
add_action_without_resource_constraint(self, action, sid_namespace='MultMultNone')
This handles the cases where certain actions do not handle resource constraints - either by AWS, or for flexibility when adding dependent actions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
action |
|
The single action to add to the SID namespace. For instance, s3:ListAllMyBuckets |
required |
sid_namespace |
|
MultMultNone by default. Other valid option is "SkipResourceConstraints" |
'MultMultNone' |
Source code in policy_sentry/writing/sid_group.py
def add_action_without_resource_constraint(
self, action, sid_namespace="MultMultNone"
):
"""
This handles the cases where certain actions do not handle resource constraints - either by AWS, or for
flexibility when adding dependent actions.
Arguments:
action: The single action to add to the SID namespace. For instance, s3:ListAllMyBuckets
sid_namespace: MultMultNone by default. Other valid option is "SkipResourceConstraints"
"""
if sid_namespace == "SkipResourceConstraints":
temp_sid_dict = {
"arn": ["*"],
"service": "Skip",
"access_level": "ResourceConstraints",
"arn_format": "*",
"actions": [action],
}
elif sid_namespace == "MultMultNone":
temp_sid_dict = {
"arn": ["*"],
"service": "Mult",
"access_level": "Mult",
"arn_format": "*",
"actions": [action],
}
else:
raise Exception(
"Please specify the sid_namespace as either 'SkipResourceConstraints' or "
"'MultMultNone'."
)
if isinstance(action, str):
if sid_namespace in self.sids.keys():
if action not in self.sids[sid_namespace]["actions"]:
self.sids[sid_namespace]["actions"].append(action)
else:
self.sids[sid_namespace] = temp_sid_dict
else:
raise Exception("Please provide the action as a string, not a list.")
return self.sids
add_by_arn_and_access_level(self, arn_list, access_level, conditions_block=None)
This adds the user-supplied ARN(s), service prefixes, access levels, and condition keys (if applicable) given by the user. It derives the list of IAM actions based on the user's requested ARNs and access levels.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
arn_list |
|
Just a list of resource ARNs. |
required |
access_level |
|
"Read", "List", "Tagging", "Write", or "Permissions management" |
required |
conditions_block |
|
Optionally, a condition block with one or more conditions |
None |
Source code in policy_sentry/writing/sid_group.py
def add_by_arn_and_access_level(
self, arn_list, access_level, conditions_block=None
):
"""
This adds the user-supplied ARN(s), service prefixes, access levels, and condition keys (if applicable) given
by the user. It derives the list of IAM actions based on the user's requested ARNs and access levels.
Arguments:
arn_list: Just a list of resource ARNs.
access_level: "Read", "List", "Tagging", "Write", or "Permissions management"
conditions_block: Optionally, a condition block with one or more conditions
"""
for arn in arn_list:
service_prefix = get_service_from_arn(arn)
service_action_data = get_action_data(service_prefix, "*")
for service_prefix in service_action_data:
for row in service_action_data[service_prefix]:
if (
does_arn_match(arn, row["resource_arn_format"])
and row["access_level"] == access_level
):
raw_arn_format = row["resource_arn_format"]
resource_type_name = get_resource_type_name_with_raw_arn(
raw_arn_format
)
sid_namespace = create_policy_sid_namespace(
service_prefix, access_level, resource_type_name
)
actions = get_actions_with_arn_type_and_access_level(
service_prefix, resource_type_name, access_level
)
# Make supplied actions lowercase
# supplied_actions = [x.lower() for x in actions]
supplied_actions = actions.copy()
dependent_actions = get_dependent_actions(supplied_actions)
# List comprehension to get all dependent actions that are not in the supplied actions.
dependent_actions = [
x for x in dependent_actions if x not in supplied_actions
]
if len(dependent_actions) > 0:
for dep_action in dependent_actions:
self.add_action_without_resource_constraint(dep_action)
# self.add_action_without_resource_constraint(
# str.lower(dep_action)
# )
temp_sid_dict = {
"arn": [arn],
"service": service_prefix,
"access_level": access_level,
"arn_format": raw_arn_format,
"actions": actions,
"conditions": [], # TODO: Add conditions
}
if sid_namespace in self.sids.keys():
# If the ARN already exists there, skip it.
if arn not in self.sids[sid_namespace]["arn"]:
self.sids[sid_namespace]["arn"].append(arn)
# If it did not exist before at all, create it.
else:
self.sids[sid_namespace] = temp_sid_dict
add_by_list_of_actions(self, supplied_actions)
Takes a list of actions, queries the database for corresponding arns, adds them to the object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
supplied_actions |
|
A list of supplied actions |
required |
Source code in policy_sentry/writing/sid_group.py
def add_by_list_of_actions(self, supplied_actions):
"""
Takes a list of actions, queries the database for corresponding arns, adds them to the object.
Arguments:
supplied_actions: A list of supplied actions
"""
# actions_list = get_dependent_actions(supplied_actions)
dependent_actions = get_dependent_actions(supplied_actions)
dependent_actions = [x for x in dependent_actions if x not in supplied_actions]
logger.debug("Adding by list of actions")
logger.debug(f"Supplied actions: {str(supplied_actions)}")
logger.debug(f"Dependent actions: {str(dependent_actions)}")
arns_matching_supplied_actions = []
# arns_matching_supplied_actions is a list of dicts.
# It must do this rather than dictionaries because there will be duplicate
# values by nature of how the entries in the IAM database are structured.
# I'll provide the example values here to improve readability.
for action in supplied_actions:
service_name, action_name = action.split(":")
action_data = get_action_data(service_name, action_name)
for row in action_data[service_name]:
if row["resource_arn_format"] not in arns_matching_supplied_actions:
arns_matching_supplied_actions.append(
{
"resource_arn_format": row["resource_arn_format"],
"access_level": row["access_level"],
"action": row["action"],
}
)
# Identify the actions that do not support resource constraints
# If that's the case, add it to the wildcard namespace. Otherwise, don't add it.
actions_without_resource_constraints = []
for item in arns_matching_supplied_actions:
if item["resource_arn_format"] != "*":
self.add_by_arn_and_access_level(
[item["resource_arn_format"]], item["access_level"]
)
else:
actions_without_resource_constraints.append(item["action"])
# If there are any dependent actions, we need to add them without resource constraints.
# Otherwise, we get into issues where the amount of extra SIDs will balloon.
# Also, the user has no way of knowing what those dependent actions are beforehand.
# TODO: This is, in fact, a great opportunity to introduce conditions. But we aren't there yet.
if len(dependent_actions) > 0:
for dep_action in dependent_actions:
self.add_action_without_resource_constraint(dep_action)
# self.add_action_without_resource_constraint(str.lower(dep_action))
# Now, because add_by_arn_and_access_level() adds all actions under an access level, we have to
# remove all actions that do not match the supplied_actions. This is done in-place.
logger.debug(
"Purging actions that do not match the requested actions and dependent actions"
)
logger.debug(f"Supplied actions: {str(supplied_actions)}")
logger.debug(f"Dependent actions: {str(dependent_actions)}")
self.remove_actions_not_matching_these(supplied_actions + dependent_actions)
for action in actions_without_resource_constraints:
logger.debug(
f"Deliberately adding the action {action} without resource constraints"
)
self.add_action_without_resource_constraint(action)
logger.debug(
"Removing actions that are in the wildcard arn (Resources = '*') as well as other statements that have "
"resource constraints "
)
self.remove_actions_duplicated_in_wildcard_arn()
logger.debug("Getting the rendered policy")
rendered_policy = self.get_rendered_policy()
return rendered_policy
add_exclude_actions(self, exclude_actions)
To exclude actions from the output
Source code in policy_sentry/writing/sid_group.py
def add_exclude_actions(self, exclude_actions):
"""To exclude actions from the output"""
if exclude_actions:
expanded_actions = determine_actions_to_expand(exclude_actions)
self.exclude_actions = [x.lower() for x in expanded_actions]
else:
self.exclude_actions = []
add_requested_service_wide(self, service_prefixes, access_level)
When a user requests all wildcard-only actions available under a service at a specific access level
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service_prefixes |
|
A list of service prefixes |
required |
access_level |
|
The requested access level |
required |
Source code in policy_sentry/writing/sid_group.py
def add_requested_service_wide(self, service_prefixes, access_level):
"""
When a user requests all wildcard-only actions available under a service at a specific access level
Arguments:
service_prefixes: A list of service prefixes
access_level: The requested access level
"""
if access_level == "Read":
self.wildcard_only_service_read = service_prefixes
elif access_level == "Write":
self.wildcard_only_service_write = service_prefixes
elif access_level == "List":
self.wildcard_only_service_list = service_prefixes
elif access_level == "Tagging":
self.wildcard_only_service_tagging = service_prefixes
elif access_level == "Permissions management":
self.wildcard_only_service_permissions_management = service_prefixes
add_skip_resource_constraints(self, skip_resource_constraints_actions)
To override resource constraint requirements - i.e., instead of restricting s3:PutObject
to a path and
allowing s3:PutObject
to *
resources, put s3:GetObject
here.
Source code in policy_sentry/writing/sid_group.py
def add_skip_resource_constraints(self, skip_resource_constraints_actions):
"""
To override resource constraint requirements - i.e., instead of restricting `s3:PutObject` to a path and
allowing `s3:PutObject` to `*` resources, put `s3:GetObject` here.
"""
if isinstance(skip_resource_constraints_actions, list):
self.skip_resource_constraints.extend(skip_resource_constraints_actions)
elif isinstance(skip_resource_constraints_actions, str):
self.skip_resource_constraints.append([skip_resource_constraints_actions])
else:
raise Exception("Please provide 'skip_resource_constraints' as a list of IAM actions.")
add_wildcard_only_actions(self, provided_wildcard_actions)
Given a list of IAM actions, add individual IAM Actions that do not support resource constraints to the MultMultNone SID
Parameters:
Name | Type | Description | Default |
---|---|---|---|
provided_wildcard_actions |
|
list actions provided by the user. |
required |
Source code in policy_sentry/writing/sid_group.py
def add_wildcard_only_actions(self, provided_wildcard_actions):
"""
Given a list of IAM actions, add individual IAM Actions that do not support resource constraints to the MultMultNone SID
Arguments:
provided_wildcard_actions: list actions provided by the user.
"""
if isinstance(provided_wildcard_actions, list):
verified_wildcard_actions = remove_actions_that_are_not_wildcard_arn_only(
provided_wildcard_actions
)
if len(verified_wildcard_actions) > 0:
logger.debug(
"Attempting to add the following actions to the policy: %s",
verified_wildcard_actions,
)
self.add_by_list_of_actions(verified_wildcard_actions)
logger.debug(
"Added the following wildcard-only actions to the policy: %s",
verified_wildcard_actions,
)
add_wildcard_only_actions_matching_services_and_access_level(self, services, access_level)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
services |
|
A list of AWS services |
required |
access_level |
|
An access level as it is written in the database, such as 'Read', 'Write', 'List', 'Permissions management', or 'Tagging' |
required |
Source code in policy_sentry/writing/sid_group.py
def add_wildcard_only_actions_matching_services_and_access_level(
self, services, access_level
):
"""
Arguments:
services: A list of AWS services
access_level: An access level as it is written in the database, such as 'Read', 'Write', 'List', 'Permissions management', or 'Tagging'
"""
wildcard_only_actions_to_add = []
for service in services:
actions = get_actions_at_access_level_that_support_wildcard_arns_only(
service, access_level
)
wildcard_only_actions_to_add.extend(actions)
self.add_wildcard_only_actions(wildcard_only_actions_to_add)
get_rendered_policy(self, minimize=None)
Get the JSON rendered policy
Parameters:
Name | Type | Description | Default |
---|---|---|---|
minimize |
|
Reduce the character count of policies without creating overlap with other action names |
None |
Returns:
Type | Description |
---|---|
Dictionary |
The IAM Policy JSON |
Source code in policy_sentry/writing/sid_group.py
def get_rendered_policy(self, minimize=None):
"""
Get the JSON rendered policy
Arguments:
minimize: Reduce the character count of policies without creating overlap with other action names
Returns:
Dictionary: The IAM Policy JSON
"""
statements = []
# Only set the actions to lowercase if minimize is provided
all_actions = get_all_actions(lowercase=True)
# render the policy
sids_to_be_changed = []
for sid in self.sids:
temp_actions = self.sids[sid]["actions"]
if len(temp_actions) == 0:
logger.debug(f"No actions for sid {sid}")
continue
actions = []
if self.exclude_actions:
for temp_action in temp_actions:
if temp_action.lower() in self.exclude_actions:
logger.debug(f"\tExcluded action: {temp_action}")
else:
if temp_action not in actions:
actions.append(temp_action)
else:
actions = temp_actions
# temp_actions.clear()
match_found = False
if minimize is not None and isinstance(minimize, int):
logger.debug("Minimizing statements...")
actions = minimize_statement_actions(
actions, all_actions, minchars=minimize
)
# searching in the existing statements
# further minimizing the the output
for stmt in statements:
if stmt["Resource"] == self.sids[sid]["arn"]:
stmt["Action"].extend(actions)
match_found = True
sids_to_be_changed.append(stmt["Sid"])
break
logger.debug(f"Adding statement with SID {sid}")
logger.debug(f"{sid} SID has the actions: {actions}")
logger.debug(f"{sid} SID has the resources: {self.sids[sid]['arn']}")
if not match_found:
statements.append(
{
"Sid": sid,
"Effect": "Allow",
"Action": actions,
"Resource": self.sids[sid]["arn"],
}
)
if sids_to_be_changed:
for stmt in statements:
if stmt['Sid'] in sids_to_be_changed:
arn_details = parse_arn(stmt['Resource'][0])
resource_path = arn_details.get("resource_path")
resource_sid_segment = strip_special_characters(
f"{arn_details['resource']}{resource_path}"
)
stmt['Sid'] = create_policy_sid_namespace(arn_details['service'], "Mult", resource_sid_segment)
policy = {"Version": POLICY_LANGUAGE_VERSION, "Statement": statements}
return policy
get_sid(self, sid)
Get a single group by the SID identifier
Source code in policy_sentry/writing/sid_group.py
def get_sid(self, sid):
"""Get a single group by the SID identifier"""
if self.sids[sid]:
return self.sids[sid]
else:
raise Exception(f"No SID with the value of {sid}")
get_sid_group(self)
Get the whole SID group as JSON
Source code in policy_sentry/writing/sid_group.py
def get_sid_group(self):
"""
Get the whole SID group as JSON
"""
return self.sids
list_sids(self)
Get a list of all of the SIDs by their identifiers
Returns:
Type | Description |
---|---|
List |
A list of SIDs in the SID group |
Source code in policy_sentry/writing/sid_group.py
def list_sids(self):
"""
Get a list of all of the SIDs by their identifiers
Returns:
List: A list of SIDs in the SID group
"""
return self.sids.keys()
process_template(self, cfg, minimize=None)
Process the Policy Sentry template as a dict. This auto-detects whether or not the file is in CRUD mode or Actions mode.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cfg |
|
The loaded YAML as a dict. Must follow Policy Sentry dictated format. |
required |
minimize |
|
Minimize the resulting statement with safe usage of wildcards to reduce policy length. Set this to the character length you want - for example, 0, or 4. Defaults to none. |
None |
Returns:
Type | Description |
---|---|
Dictionary |
The rendered IAM JSON Policy |
Source code in policy_sentry/writing/sid_group.py
def process_template(self, cfg, minimize=None):
"""
Process the Policy Sentry template as a dict. This auto-detects whether or not the file is in CRUD mode or
Actions mode.
Arguments:
cfg: The loaded YAML as a dict. Must follow Policy Sentry dictated format.
minimize: Minimize the resulting statement with *safe* usage of wildcards to reduce policy length. Set this to the character length you want - for example, 0, or 4. Defaults to none.
Returns:
Dictionary: The rendered IAM JSON Policy
"""
if cfg.get("mode") == "crud":
logger.debug("CRUD mode selected")
check_crud_schema(cfg)
# EXCLUDE ACTIONS
if cfg.get("exclude-actions"):
if cfg.get("exclude-actions")[0] != "":
self.add_exclude_actions(cfg["exclude-actions"])
# WILDCARD ONLY SECTION
if cfg.get("wildcard-only"):
if cfg.get("wildcard-only").get("single-actions"):
if cfg["wildcard-only"]["single-actions"][0] != "":
provided_wildcard_actions = cfg["wildcard-only"]["single-actions"]
logger.debug(f"Requested wildcard-only actions: {str(provided_wildcard_actions)}")
self.wildcard_only_single_actions = provided_wildcard_actions
if cfg.get("wildcard-only").get("service-read"):
if cfg["wildcard-only"]["service-read"][0] != "":
service_read = cfg["wildcard-only"]["service-read"]
logger.debug(f"Requested wildcard-only actions: {str(service_read)}")
self.wildcard_only_service_read = service_read
if cfg.get("wildcard-only").get("service-write"):
if cfg["wildcard-only"]["service-write"][0] != "":
service_write = cfg["wildcard-only"]["service-write"]
logger.debug(f"Requested wildcard-only actions: {str(service_write)}")
self.wildcard_only_service_write = service_write
if cfg.get("wildcard-only").get("service-list"):
if cfg["wildcard-only"]["service-list"][0] != "":
service_list = cfg["wildcard-only"]["service-list"]
logger.debug(f"Requested wildcard-only actions: {str(service_list)}")
self.wildcard_only_service_list = service_list
if cfg.get("wildcard-only").get("service-tagging"):
if cfg["wildcard-only"]["service-tagging"][0] != "":
service_tagging = cfg["wildcard-only"]["service-tagging"]
logger.debug(f"Requested wildcard-only actions: {str(service_tagging)}")
self.wildcard_only_service_tagging = service_tagging
if cfg.get("wildcard-only").get("service-permissions-management"):
if cfg["wildcard-only"]["service-permissions-management"][0] != "":
service_permissions_management = cfg["wildcard-only"]["service-permissions-management"]
logger.debug(f"Requested wildcard-only actions: {str(service_permissions_management)}")
self.wildcard_only_service_permissions_management = service_permissions_management
# Process the wildcard-only section
self.process_wildcard_only_actions()
# Standard access levels
if cfg.get("read"):
if cfg["read"][0] != "":
logger.debug(f"Requested access to arns: {str(cfg['read'])}")
self.add_by_arn_and_access_level(cfg["read"], "Read")
if cfg.get("write"):
if cfg["write"][0] != "":
logger.debug(f"Requested access to arns: {str(cfg['write'])}")
self.add_by_arn_and_access_level(cfg["write"], "Write")
if cfg.get("list"):
if cfg["list"][0] != "":
logger.debug(f"Requested access to arns: {str(cfg['list'])}")
self.add_by_arn_and_access_level(cfg["list"], "List")
if cfg.get("tagging"):
if cfg["tagging"][0] != "":
logger.debug(f"Requested access to arns: {str(cfg['tagging'])}")
self.add_by_arn_and_access_level(cfg["tagging"], "Tagging")
if cfg.get("permissions-management"):
if cfg["permissions-management"][0] != "":
logger.debug(f"Requested access to arns: {str(cfg['permissions-management'])}")
self.add_by_arn_and_access_level(cfg["permissions-management"], "Permissions management")
# SKIP RESOURCE CONSTRAINTS
if cfg.get("skip-resource-constraints"):
if cfg["skip-resource-constraints"][0] != "":
logger.debug(
f"Requested override: the actions {str(cfg['skip-resource-constraints'])} will "
f"skip resource constraints."
)
self.add_skip_resource_constraints(cfg["skip-resource-constraints"])
for skip_resource_constraints_action in self.skip_resource_constraints:
self.add_action_without_resource_constraint(
skip_resource_constraints_action, "SkipResourceConstraints"
)
elif cfg.get("mode") == "actions":
check_actions_schema(cfg)
if "actions" in cfg.keys():
if cfg["actions"] is not None and cfg["actions"][0] != "":
self.add_by_list_of_actions(cfg["actions"])
rendered_policy = self.get_rendered_policy(minimize)
return rendered_policy
process_wildcard_only_actions(self)
After (1) the list of wildcard-only single actions have been added and (2) the list of wildcard-only service-wide actions have been added, process them and store them under the proper SID.
Source code in policy_sentry/writing/sid_group.py
def process_wildcard_only_actions(self):
"""
After (1) the list of wildcard-only single actions have been added and (2) the list of wildcard-only service-wide actions have been added, process them and store them under the proper SID.
"""
provided_wildcard_actions = (
self.wildcard_only_single_actions
+ get_wildcard_only_actions_matching_services_and_access_level(self.wildcard_only_service_read, "Read")
+ get_wildcard_only_actions_matching_services_and_access_level(self.wildcard_only_service_list, "List")
+ get_wildcard_only_actions_matching_services_and_access_level(self.wildcard_only_service_write, "Write")
+ get_wildcard_only_actions_matching_services_and_access_level(self.wildcard_only_service_tagging, "Tagging")
+ get_wildcard_only_actions_matching_services_and_access_level(self.wildcard_only_service_permissions_management, "Permissions management")
)
self.add_wildcard_only_actions(
provided_wildcard_actions
)
remove_actions_duplicated_in_wildcard_arn(self)
Removes actions from the object that are in a resource-specific ARN, as well as the *
resource.
For example, if ssm:GetParameter
is restricted to a specific parameter path, as well as *
, then we want to
remove the *
option to force least privilege.
Source code in policy_sentry/writing/sid_group.py
def remove_actions_duplicated_in_wildcard_arn(self):
"""
Removes actions from the object that are in a resource-specific ARN, as well as the `*` resource.
For example, if `ssm:GetParameter` is restricted to a specific parameter path, as well as `*`, then we want to
remove the `*` option to force least privilege.
"""
actions_under_wildcard_resources = []
actions_under_wildcard_resources_to_nuke = []
# Build a temporary list. Contains actions in MultMultNone SID (where resources = "*")
for sid in self.sids:
if self.sids[sid]["arn_format"] == "*":
actions_under_wildcard_resources.extend(self.sids[sid]["actions"])
# If the actions under the MultMultNone SID exist under other SIDs
if len(actions_under_wildcard_resources) > 0:
for sid in self.sids:
if "*" not in self.sids[sid]["arn_format"]:
for action in actions_under_wildcard_resources:
if action in self.sids[sid]["actions"]:
if action not in self.skip_resource_constraints:
# add it to a list of actions to nuke when they are under other SIDs
actions_under_wildcard_resources_to_nuke.append(action)
# If there are actions that we need to remove from SIDs outside of MultMultNone SID
if len(actions_under_wildcard_resources_to_nuke) > 0:
for sid in self.sids:
if "*" in self.sids[sid]["arn_format"]:
for action in actions_under_wildcard_resources_to_nuke:
try:
self.sids[sid]["actions"].remove(str(action))
except BaseException: # pylint: disable=broad-except
logger.debug("Removal not successful")
remove_actions_not_matching_these(self, actions_to_keep)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
actions_to_keep |
|
A list of actions to leave in the policy. All actions not in this list are removed. |
required |
Source code in policy_sentry/writing/sid_group.py
def remove_actions_not_matching_these(self, actions_to_keep):
"""
Arguments:
actions_to_keep: A list of actions to leave in the policy. All actions not in this list are removed.
"""
actions_to_keep = get_lowercase_action_list(actions_to_keep)
actions_deleted = []
for sid in self.sids:
placeholder_actions_list = []
for action in self.sids[sid]["actions"]:
# if the action is not in the list of selected actions, don't copy it to the placeholder list
if action.lower() in actions_to_keep:
placeholder_actions_list.append(action)
elif action.lower() not in actions_to_keep:
logger.debug("%s not found in list of actions to keep: %s", action.lower(), actions_to_keep)
actions_deleted.append(action)
# Clear the list and then extend it to include the updated actions only
self.sids[sid]["actions"].clear()
self.sids[sid]["actions"].extend(placeholder_actions_list.copy())
# Highlight the actions that you remove
logger.debug("Actions deleted: %s", str(actions_deleted))
# Now that we've removed a bunch of actions, if there are SID groups without any actions,
# remove them so we don't get SIDs with empty action lists
self.remove_sids_with_empty_action_lists()
remove_sids_with_empty_action_lists(self)
Now that we've removed a bunch of actions, if there are SID groups without any actions, remove them so we don't get SIDs with empty action lists
Source code in policy_sentry/writing/sid_group.py
def remove_sids_with_empty_action_lists(self):
"""
Now that we've removed a bunch of actions, if there are SID groups without any actions, remove them so we don't get SIDs with empty action lists
"""
sid_namespaces_to_delete = []
for sid in self.sids:
if len(self.sids[sid]["actions"]) > 0:
pass
# If the size is zero, add it to the indexes_to_delete list.
else:
sid_namespaces_to_delete.append(sid)
# Loop through sid_namespaces_to_delete in reverse order (so we delete index
# 10 before index 8, for example)
if len(sid_namespaces_to_delete) > 0:
for i in reversed(range(len(sid_namespaces_to_delete))):
del self.sids[sid_namespaces_to_delete[i]]
create_policy_sid_namespace(service, access_level, resource_type_name, condition_block=None)
Simply generates the SID name. The SID groups ARN types that share an access level.
For example, S3 objects vs. SSM Parameter have different ARN types - as do S3 objects vs S3 buckets. That's how we choose to group them.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service |
|
|
required |
access_level |
|
|
required |
resource_type_name |
|
|
required |
condition_block |
|
|
None |
Returns:
Type | Description |
---|---|
String |
A string like |
Source code in policy_sentry/writing/sid_group.py
def create_policy_sid_namespace(
service, access_level, resource_type_name, condition_block=None
):
"""
Simply generates the SID name. The SID groups ARN types that share an access level.
For example, S3 objects vs. SSM Parameter have different ARN types - as do S3 objects vs S3 buckets. That's how we
choose to group them.
Arguments:
service: `ssm`
access_level: `Read`
resource_type_name: `parameter`
condition_block: `{"condition_key_string": "ec2:ResourceTag/purpose", "condition_type_string": "StringEquals", "condition_value": "test"}`
Returns:
String: A string like `SsmReadParameter`
"""
# Sanitize the resource_type_name; otherwise we hit some list conversion
# errors
resource_type_name = re.sub("[^A-Za-z0-9]+", "", resource_type_name)
# Also remove the space from the Access level, if applicable. This only
# applies for "Permissions management"
access_level = re.sub("[^A-Za-z0-9]+", "", access_level)
sid_namespace_prefix = (
capitalize_first_character(strip_special_characters(service))
+ capitalize_first_character(access_level)
+ capitalize_first_character(resource_type_name)
)
if condition_block:
condition_key_namespace = re.sub(
"[^A-Za-z0-9]+", "", condition_block["condition_key_string"]
)
condition_type_namespace = condition_block["condition_type_string"]
condition_value_namespace = re.sub(
"[^A-Za-z0-9]+", "", condition_block["condition_value"]
)
sid_namespace_condition_suffix = (
f"{capitalize_first_character(condition_key_namespace)}"
f"{capitalize_first_character(condition_type_namespace)}"
f"{capitalize_first_character(condition_value_namespace)}"
)
sid_namespace = sid_namespace_prefix + sid_namespace_condition_suffix
else:
sid_namespace = sid_namespace_prefix
return sid_namespace
get_wildcard_only_actions_matching_services_and_access_level(services, access_level)
Get a list of wildcard-only actions matching the services and access level
Parameters:
Name | Type | Description | Default |
---|---|---|---|
services |
|
A list of AWS services |
required |
access_level |
|
An access level as it is written in the database, such as 'Read', 'Write', 'List', 'Permissions management', or 'Tagging' |
required |
Returns:
Type | Description |
---|---|
List |
A list of wildcard-only actions matching the services and access level |
Source code in policy_sentry/writing/sid_group.py
def get_wildcard_only_actions_matching_services_and_access_level(services, access_level):
"""
Get a list of wildcard-only actions matching the services and access level
Arguments:
services: A list of AWS services
access_level: An access level as it is written in the database, such as 'Read', 'Write', 'List', 'Permissions management', or 'Tagging'
Returns:
List: A list of wildcard-only actions matching the services and access level
"""
wildcard_only_actions_to_add = []
for service in services:
actions = get_actions_at_access_level_that_support_wildcard_arns_only(
service, access_level
)
wildcard_only_actions_to_add.extend(actions)
return wildcard_only_actions_to_add
remove_actions_that_are_not_wildcard_arn_only(actions_list)
Given a list of actions, remove the ones that CAN be restricted to ARNs, leaving only the ones that cannot.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
actions_list |
|
A list of actions |
required |
Returns:
Type | Description |
---|---|
List |
An updated list of actions |
Source code in policy_sentry/writing/sid_group.py
def remove_actions_that_are_not_wildcard_arn_only(actions_list):
"""
Given a list of actions, remove the ones that CAN be restricted to ARNs, leaving only the ones that cannot.
Arguments:
actions_list: A list of actions
Returns:
List: An updated list of actions
"""
# remove duplicates, if there are any
actions_list = list(dict.fromkeys(actions_list))
actions_list_placeholder = []
for action in actions_list:
try:
service_name, action_name = action.split(":")
except ValueError as v_e:
# We will skip the action because this likely means that the wildcard action provided is not valid.
logger.debug(v_e)
logger.debug(
"The value provided in wildcard-only section is not formatted properly."
)
continue
rows = get_actions_that_support_wildcard_arns_only(service_name)
for row in rows:
if row.lower() == action.lower():
actions_list_placeholder.append(f"{service_name}:{action_name}")
return actions_list_placeholder