Skip to content

writing.sid_group

writing.sid_group

See the examples under https://github.com/salesforce/policy_sentry/tree/master/examples/library-usage/writing.

readthedocs is not building this for whatever reason. Disabling temporarily

sid_group indicates that this is a collection of policy-related data organized by their SIDs

SidGroup

This class is critical to the creation of least privilege policies. It uses the SIDs as namespaces. The namespaces follow this format: {Servicename}{Accesslevel}{Resourcetypename}

So, a resulting statement's SID might look like 'S3ListBucket'

If a condition key is supplied (like s3:RequestJob), the SID string will be significantly longer.

It will resemble this format: {Servicename}{Accesslevel}{Resourcetypename}{Conditionkeystring}{Conditiontypestring}{Conditionkeyvalue}

For example: EC2 write actions on the security-group resource, using the following condition map:

    "Condition": {
        "StringEquals": {"ec2:ResourceTag/Owner": "${aws:username}"}
    }

The resulting SID would be: Ec2WriteSecuritygroupResourcetagownerStringequalsAwsusername

Or, for actions that support wildcard ARNs only, an example could be: Ec2WriteMultResourcetagownerStringequalsAwsusername

add_action_without_resource_constraint(self, action, sid_namespace='MultMultNone')

This handles the cases where certain actions do not handle resource constraints - either by AWS, or for flexibility when adding dependent actions.

Parameters:

Name Type Description Default
action

The single action to add to the SID namespace. For instance, s3:ListAllMyBuckets

required
sid_namespace

MultMultNone by default. Other valid option is "SkipResourceConstraints"

'MultMultNone'
Source code in policy_sentry/writing/sid_group.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
def add_action_without_resource_constraint(
    self, action, sid_namespace="MultMultNone"
):
    """
    This handles the cases where certain actions do not handle resource constraints - either by AWS, or for
    flexibility when adding dependent actions.

    Arguments:
        action: The single action to add to the SID namespace. For instance, s3:ListAllMyBuckets
        sid_namespace: MultMultNone by default. Other valid option is "SkipResourceConstraints"
    """
    if sid_namespace == "SkipResourceConstraints":
        temp_sid_dict = {
            "arn": ["*"],
            "service": "Skip",
            "access_level": "ResourceConstraints",
            "arn_format": "*",
            "actions": [action],
        }
    elif sid_namespace == "MultMultNone":
        temp_sid_dict = {
            "arn": ["*"],
            "service": "Mult",
            "access_level": "Mult",
            "arn_format": "*",
            "actions": [action],
        }
    else:
        raise Exception(
            "Please specify the sid_namespace as either 'SkipResourceConstraints' or "
            "'MultMultNone'."
        )
    if isinstance(action, str):
        if sid_namespace in self.sids.keys():
            if action not in self.sids[sid_namespace]["actions"]:
                self.sids[sid_namespace]["actions"].append(action)
        else:
            self.sids[sid_namespace] = temp_sid_dict
    else:
        raise Exception("Please provide the action as a string, not a list.")
    return self.sids

add_by_arn_and_access_level(self, arn_list, access_level, conditions_block=None)

This adds the user-supplied ARN(s), service prefixes, access levels, and condition keys (if applicable) given by the user. It derives the list of IAM actions based on the user's requested ARNs and access levels.

Parameters:

Name Type Description Default
arn_list

Just a list of resource ARNs.

required
access_level

"Read", "List", "Tagging", "Write", or "Permissions management"

required
conditions_block

Optionally, a condition block with one or more conditions

None
Source code in policy_sentry/writing/sid_group.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def add_by_arn_and_access_level(
    self, arn_list, access_level, conditions_block=None
):
    """
    This adds the user-supplied ARN(s), service prefixes, access levels, and condition keys (if applicable) given
    by the user. It derives the list of IAM actions based on the user's requested ARNs and access levels.

    Arguments:
        arn_list: Just a list of resource ARNs.
        access_level: "Read", "List", "Tagging", "Write", or "Permissions management"
        conditions_block: Optionally, a condition block with one or more conditions
    """
    for arn in arn_list:
        service_prefix = get_service_from_arn(arn)
        service_action_data = get_action_data(service_prefix, "*")
        for service_prefix in service_action_data:
            for row in service_action_data[service_prefix]:
                if (
                    does_arn_match(arn, row["resource_arn_format"])
                    and row["access_level"] == access_level
                ):
                    raw_arn_format = row["resource_arn_format"]
                    resource_type_name = get_resource_type_name_with_raw_arn(
                        raw_arn_format
                    )
                    sid_namespace = create_policy_sid_namespace(
                        service_prefix, access_level, resource_type_name
                    )
                    actions = get_actions_with_arn_type_and_access_level(
                        service_prefix, resource_type_name, access_level
                    )
                    # Make supplied actions lowercase
                    # supplied_actions = [x.lower() for x in actions]
                    supplied_actions = actions.copy()
                    dependent_actions = get_dependent_actions(supplied_actions)
                    # List comprehension to get all dependent actions that are not in the supplied actions.
                    dependent_actions = [
                        x for x in dependent_actions if x not in supplied_actions
                    ]
                    if len(dependent_actions) > 0:
                        for dep_action in dependent_actions:
                            self.add_action_without_resource_constraint(dep_action)
                            # self.add_action_without_resource_constraint(
                            #     str.lower(dep_action)
                            # )

                    temp_sid_dict = {
                        "arn": [arn],
                        "service": service_prefix,
                        "access_level": access_level,
                        "arn_format": raw_arn_format,
                        "actions": actions,
                        "conditions": [],  # TODO: Add conditions
                    }
                    if sid_namespace in self.sids.keys():
                        # If the ARN already exists there, skip it.
                        if arn not in self.sids[sid_namespace]["arn"]:
                            self.sids[sid_namespace]["arn"].append(arn)
                    # If it did not exist before at all, create it.
                    else:
                        self.sids[sid_namespace] = temp_sid_dict

add_by_list_of_actions(self, supplied_actions)

Takes a list of actions, queries the database for corresponding arns, adds them to the object.

Parameters:

Name Type Description Default
supplied_actions

A list of supplied actions

required
Source code in policy_sentry/writing/sid_group.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
def add_by_list_of_actions(self, supplied_actions):
    """
    Takes a list of actions, queries the database for corresponding arns, adds them to the object.

    Arguments:
        supplied_actions: A list of supplied actions
    """
    # actions_list = get_dependent_actions(supplied_actions)
    dependent_actions = get_dependent_actions(supplied_actions)
    dependent_actions = [x for x in dependent_actions if x not in supplied_actions]
    logger.debug("Adding by list of actions")
    logger.debug(f"Supplied actions: {str(supplied_actions)}")
    logger.debug(f"Dependent actions: {str(dependent_actions)}")
    arns_matching_supplied_actions = []

    # arns_matching_supplied_actions is a list of dicts.
    # It must do this rather than dictionaries because there will be duplicate
    #     values by nature of how the entries in the IAM database are structured.
    # I'll provide the example values here to improve readability.

    for action in supplied_actions:
        service_name, action_name = action.split(":")
        action_data = get_action_data(service_name, action_name)
        for row in action_data[service_name]:
            if row["resource_arn_format"] not in arns_matching_supplied_actions:
                arns_matching_supplied_actions.append(
                    {
                        "resource_arn_format": row["resource_arn_format"],
                        "access_level": row["access_level"],
                        "action": row["action"],
                    }
                )

    # Identify the actions that do not support resource constraints
    # If that's the case, add it to the wildcard namespace. Otherwise, don't add it.

    actions_without_resource_constraints = []
    for item in arns_matching_supplied_actions:
        if item["resource_arn_format"] != "*":
            self.add_by_arn_and_access_level(
                [item["resource_arn_format"]], item["access_level"]
            )
        else:
            actions_without_resource_constraints.append(item["action"])

    # If there are any dependent actions, we need to add them without resource constraints.
    # Otherwise, we get into issues where the amount of extra SIDs will balloon.
    # Also, the user has no way of knowing what those dependent actions are beforehand.
    # TODO: This is, in fact, a great opportunity to introduce conditions. But we aren't there yet.
    if len(dependent_actions) > 0:
        for dep_action in dependent_actions:
            self.add_action_without_resource_constraint(dep_action)
            # self.add_action_without_resource_constraint(str.lower(dep_action))
    # Now, because add_by_arn_and_access_level() adds all actions under an access level, we have to
    # remove all actions that do not match the supplied_actions. This is done in-place.
    logger.debug(
        "Purging actions that do not match the requested actions and dependent actions"
    )
    logger.debug(f"Supplied actions: {str(supplied_actions)}")
    logger.debug(f"Dependent actions: {str(dependent_actions)}")
    self.remove_actions_not_matching_these(supplied_actions + dependent_actions)
    for action in actions_without_resource_constraints:
        logger.debug(
            f"Deliberately adding the action {action} without resource constraints"
        )
        self.add_action_without_resource_constraint(action)
    logger.debug(
        "Removing actions that are in the wildcard arn (Resources = '*') as well as other statements that have "
        "resource constraints "
    )
    self.remove_actions_duplicated_in_wildcard_arn()
    logger.debug("Getting the rendered policy")
    rendered_policy = self.get_rendered_policy()
    return rendered_policy

add_exclude_actions(self, exclude_actions)

To exclude actions from the output

Source code in policy_sentry/writing/sid_group.py
91
92
93
94
95
96
97
def add_exclude_actions(self, exclude_actions):
    """To exclude actions from the output"""
    if exclude_actions:
        expanded_actions = determine_actions_to_expand(exclude_actions)
        self.exclude_actions = [x.lower() for x in expanded_actions]
    else:
        self.exclude_actions = []

add_requested_service_wide(self, service_prefixes, access_level)

When a user requests all wildcard-only actions available under a service at a specific access level

Parameters:

Name Type Description Default
service_prefixes

A list of service prefixes

required
access_level

The requested access level

required
Source code in policy_sentry/writing/sid_group.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def add_requested_service_wide(self, service_prefixes, access_level):
    """
    When a user requests all wildcard-only actions available under a service at a specific access level

    Arguments:
        service_prefixes: A list of service prefixes
        access_level: The requested access level
    """
    if access_level == "Read":
        self.wildcard_only_service_read = service_prefixes
    elif access_level == "Write":
        self.wildcard_only_service_write = service_prefixes
    elif access_level == "List":
        self.wildcard_only_service_list = service_prefixes
    elif access_level == "Tagging":
        self.wildcard_only_service_tagging = service_prefixes
    elif access_level == "Permissions management":
        self.wildcard_only_service_permissions_management = service_prefixes

add_skip_resource_constraints(self, skip_resource_constraints_actions)

To override resource constraint requirements - i.e., instead of restricting s3:PutObject to a path and allowing s3:PutObject to * resources, put s3:GetObject here.

Source code in policy_sentry/writing/sid_group.py
 99
100
101
102
103
104
105
106
107
108
109
def add_skip_resource_constraints(self, skip_resource_constraints_actions):
    """
    To override resource constraint requirements - i.e., instead of restricting `s3:PutObject` to a path and
    allowing `s3:PutObject` to `*` resources, put `s3:GetObject` here.
    """
    if isinstance(skip_resource_constraints_actions, list):
        self.skip_resource_constraints.extend(skip_resource_constraints_actions)
    elif isinstance(skip_resource_constraints_actions, str):
        self.skip_resource_constraints.append([skip_resource_constraints_actions])
    else:
        raise Exception("Please provide 'skip_resource_constraints' as a list of IAM actions.")

add_wildcard_only_actions(self, provided_wildcard_actions)

Given a list of IAM actions, add individual IAM Actions that do not support resource constraints to the MultMultNone SID

Parameters:

Name Type Description Default
provided_wildcard_actions

list actions provided by the user.

required
Source code in policy_sentry/writing/sid_group.py
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
def add_wildcard_only_actions(self, provided_wildcard_actions):
    """
    Given a list of IAM actions, add individual IAM Actions that do not support resource constraints to the MultMultNone SID

    Arguments:
        provided_wildcard_actions: list actions provided by the user.
    """
    if isinstance(provided_wildcard_actions, list):
        verified_wildcard_actions = remove_actions_that_are_not_wildcard_arn_only(
            provided_wildcard_actions
        )
        if len(verified_wildcard_actions) > 0:
            logger.debug(
                "Attempting to add the following actions to the policy: %s",
                verified_wildcard_actions,
            )
            self.add_by_list_of_actions(verified_wildcard_actions)
            logger.debug(
                "Added the following wildcard-only actions to the policy: %s",
                verified_wildcard_actions,
            )

add_wildcard_only_actions_matching_services_and_access_level(self, services, access_level)

Parameters:

Name Type Description Default
services

A list of AWS services

required
access_level

An access level as it is written in the database, such as 'Read', 'Write', 'List', 'Permissions management', or 'Tagging'

required
Source code in policy_sentry/writing/sid_group.py
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
def add_wildcard_only_actions_matching_services_and_access_level(
    self, services, access_level
):
    """
    Arguments:
        services: A list of AWS services
        access_level: An access level as it is written in the database, such as 'Read', 'Write', 'List', 'Permissions management', or 'Tagging'
    """
    wildcard_only_actions_to_add = []
    for service in services:
        actions = get_actions_at_access_level_that_support_wildcard_arns_only(
            service, access_level
        )
        wildcard_only_actions_to_add.extend(actions)
    self.add_wildcard_only_actions(wildcard_only_actions_to_add)

get_rendered_policy(self, minimize=None)

Get the JSON rendered policy

Parameters:

Name Type Description Default
minimize

Reduce the character count of policies without creating overlap with other action names

None

Returns:

Type Description
Dictionary

The IAM Policy JSON

Source code in policy_sentry/writing/sid_group.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def get_rendered_policy(self, minimize=None):
    """
    Get the JSON rendered policy

    Arguments:
        minimize: Reduce the character count of policies without creating overlap with other action names
    Returns:
        Dictionary: The IAM Policy JSON
    """
    statements = []
    # Only set the actions to lowercase if minimize is provided
    all_actions = get_all_actions(lowercase=True)

    # render the policy
    sids_to_be_changed = []
    for sid in self.sids:
        temp_actions = self.sids[sid]["actions"]
        if len(temp_actions) == 0:
            logger.debug(f"No actions for sid {sid}")
            continue
        actions = []
        if self.exclude_actions:
            for temp_action in temp_actions:
                if temp_action.lower() in self.exclude_actions:
                    logger.debug(f"\tExcluded action: {temp_action}")
                else:
                    if temp_action not in actions:
                        actions.append(temp_action)
        else:
            actions = temp_actions
        # temp_actions.clear()
        match_found = False
        if minimize is not None and isinstance(minimize, int):
            logger.debug("Minimizing statements...")
            actions = minimize_statement_actions(
                actions, all_actions, minchars=minimize
            )
            # searching in the existing statements
            # further minimizing the the output
            for stmt in statements:
                if stmt["Resource"] == self.sids[sid]["arn"]:
                    stmt["Action"].extend(actions)
                    match_found = True
                    sids_to_be_changed.append(stmt["Sid"])
                    break
        logger.debug(f"Adding statement with SID {sid}")
        logger.debug(f"{sid} SID has the actions: {actions}")
        logger.debug(f"{sid} SID has the resources: {self.sids[sid]['arn']}")

        if not match_found:
            statements.append(
                {
                    "Sid": sid,
                    "Effect": "Allow",
                    "Action": actions,
                    "Resource": self.sids[sid]["arn"],
                }
            )

    if sids_to_be_changed:
        for stmt in statements:
            if stmt['Sid'] in sids_to_be_changed:
                arn_details = parse_arn(stmt['Resource'][0])
                resource_path = arn_details.get("resource_path")
                resource_sid_segment = strip_special_characters(
                    f"{arn_details['resource']}{resource_path}"
                )
                stmt['Sid'] = create_policy_sid_namespace(arn_details['service'], "Mult", resource_sid_segment)

    policy = {"Version": POLICY_LANGUAGE_VERSION, "Statement": statements}
    return policy

get_sid(self, sid)

Get a single group by the SID identifier

Source code in policy_sentry/writing/sid_group.py
75
76
77
78
79
80
def get_sid(self, sid):
    """Get a single group by the SID identifier"""
    if self.sids[sid]:
        return self.sids[sid]
    else:
        raise Exception(f"No SID with the value of {sid}")

get_sid_group(self)

Get the whole SID group as JSON

Source code in policy_sentry/writing/sid_group.py
69
70
71
72
73
def get_sid_group(self):
    """
    Get the whole SID group as JSON
    """
    return self.sids

list_sids(self)

Get a list of all of the SIDs by their identifiers

Returns:

Type Description
List

A list of SIDs in the SID group

Source code in policy_sentry/writing/sid_group.py
82
83
84
85
86
87
88
89
def list_sids(self):
    """
    Get a list of all of the SIDs by their identifiers

    Returns:
        List: A list of SIDs in the SID group
    """
    return self.sids.keys()

process_template(self, cfg, minimize=None)

Process the Policy Sentry template as a dict. This auto-detects whether or not the file is in CRUD mode or Actions mode.

Parameters:

Name Type Description Default
cfg

The loaded YAML as a dict. Must follow Policy Sentry dictated format.

required
minimize

Minimize the resulting statement with safe usage of wildcards to reduce policy length. Set this to the character length you want - for example, 0, or 4. Defaults to none.

None

Returns:

Type Description
Dictionary

The rendered IAM JSON Policy

Source code in policy_sentry/writing/sid_group.py
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
def process_template(self, cfg, minimize=None):
    """
    Process the Policy Sentry template as a dict. This auto-detects whether or not the file is in CRUD mode or
    Actions mode.

    Arguments:
        cfg: The loaded YAML as a dict. Must follow Policy Sentry dictated format.
        minimize: Minimize the resulting statement with *safe* usage of wildcards to reduce policy length. Set this to the character length you want - for example, 0, or 4. Defaults to none.
    Returns:
        Dictionary: The rendered IAM JSON Policy
    """
    if cfg.get("mode") == "crud":
        logger.debug("CRUD mode selected")
        check_crud_schema(cfg)
        # EXCLUDE ACTIONS
        if cfg.get("exclude-actions"):
            if cfg.get("exclude-actions")[0] != "":
                self.add_exclude_actions(cfg["exclude-actions"])
        # WILDCARD ONLY SECTION
        if cfg.get("wildcard-only"):
            if cfg.get("wildcard-only").get("single-actions"):
                if cfg["wildcard-only"]["single-actions"][0] != "":
                    provided_wildcard_actions = cfg["wildcard-only"]["single-actions"]
                    logger.debug(f"Requested wildcard-only actions: {str(provided_wildcard_actions)}")
                    self.wildcard_only_single_actions = provided_wildcard_actions
            if cfg.get("wildcard-only").get("service-read"):
                if cfg["wildcard-only"]["service-read"][0] != "":
                    service_read = cfg["wildcard-only"]["service-read"]
                    logger.debug(f"Requested wildcard-only actions: {str(service_read)}")
                    self.wildcard_only_service_read = service_read
            if cfg.get("wildcard-only").get("service-write"):
                if cfg["wildcard-only"]["service-write"][0] != "":
                    service_write = cfg["wildcard-only"]["service-write"]
                    logger.debug(f"Requested wildcard-only actions: {str(service_write)}")
                    self.wildcard_only_service_write = service_write
            if cfg.get("wildcard-only").get("service-list"):
                if cfg["wildcard-only"]["service-list"][0] != "":
                    service_list = cfg["wildcard-only"]["service-list"]
                    logger.debug(f"Requested wildcard-only actions: {str(service_list)}")
                    self.wildcard_only_service_list = service_list
            if cfg.get("wildcard-only").get("service-tagging"):
                if cfg["wildcard-only"]["service-tagging"][0] != "":
                    service_tagging = cfg["wildcard-only"]["service-tagging"]
                    logger.debug(f"Requested wildcard-only actions: {str(service_tagging)}")
                    self.wildcard_only_service_tagging = service_tagging
            if cfg.get("wildcard-only").get("service-permissions-management"):
                if cfg["wildcard-only"]["service-permissions-management"][0] != "":
                    service_permissions_management = cfg["wildcard-only"]["service-permissions-management"]
                    logger.debug(f"Requested wildcard-only actions: {str(service_permissions_management)}")
                    self.wildcard_only_service_permissions_management = service_permissions_management

        # Process the wildcard-only section
        self.process_wildcard_only_actions()

        # Standard access levels
        if cfg.get("read"):
            if cfg["read"][0] != "":
                logger.debug(f"Requested access to arns: {str(cfg['read'])}")
                self.add_by_arn_and_access_level(cfg["read"], "Read")
        if cfg.get("write"):
            if cfg["write"][0] != "":
                logger.debug(f"Requested access to arns: {str(cfg['write'])}")
                self.add_by_arn_and_access_level(cfg["write"], "Write")
        if cfg.get("list"):
            if cfg["list"][0] != "":
                logger.debug(f"Requested access to arns: {str(cfg['list'])}")
                self.add_by_arn_and_access_level(cfg["list"], "List")
        if cfg.get("tagging"):
            if cfg["tagging"][0] != "":
                logger.debug(f"Requested access to arns: {str(cfg['tagging'])}")
                self.add_by_arn_and_access_level(cfg["tagging"], "Tagging")
        if cfg.get("permissions-management"):
            if cfg["permissions-management"][0] != "":
                logger.debug(f"Requested access to arns: {str(cfg['permissions-management'])}")
                self.add_by_arn_and_access_level(cfg["permissions-management"], "Permissions management")

        # SKIP RESOURCE CONSTRAINTS
        if cfg.get("skip-resource-constraints"):
            if cfg["skip-resource-constraints"][0] != "":
                logger.debug(
                    f"Requested override: the actions {str(cfg['skip-resource-constraints'])} will "
                    f"skip resource constraints."
                )
                self.add_skip_resource_constraints(cfg["skip-resource-constraints"])
                for skip_resource_constraints_action in self.skip_resource_constraints:
                    self.add_action_without_resource_constraint(
                        skip_resource_constraints_action, "SkipResourceConstraints"
                    )
    elif cfg.get("mode") == "actions":
        check_actions_schema(cfg)
        if "actions" in cfg.keys():
            if cfg["actions"] is not None and cfg["actions"][0] != "":
                self.add_by_list_of_actions(cfg["actions"])

    rendered_policy = self.get_rendered_policy(minimize)
    return rendered_policy

process_wildcard_only_actions(self)

After (1) the list of wildcard-only single actions have been added and (2) the list of wildcard-only service-wide actions have been added, process them and store them under the proper SID.

Source code in policy_sentry/writing/sid_group.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def process_wildcard_only_actions(self):
    """
    After (1) the list of wildcard-only single actions have been added and (2) the list of wildcard-only service-wide actions have been added, process them and store them under the proper SID.
    """
    provided_wildcard_actions = (
        self.wildcard_only_single_actions
        + get_wildcard_only_actions_matching_services_and_access_level(self.wildcard_only_service_read, "Read")
        + get_wildcard_only_actions_matching_services_and_access_level(self.wildcard_only_service_list, "List")
        + get_wildcard_only_actions_matching_services_and_access_level(self.wildcard_only_service_write, "Write")
        + get_wildcard_only_actions_matching_services_and_access_level(self.wildcard_only_service_tagging, "Tagging")
        + get_wildcard_only_actions_matching_services_and_access_level(self.wildcard_only_service_permissions_management, "Permissions management")
    )
    self.add_wildcard_only_actions(
        provided_wildcard_actions
    )

remove_actions_duplicated_in_wildcard_arn(self)

Removes actions from the object that are in a resource-specific ARN, as well as the * resource. For example, if ssm:GetParameter is restricted to a specific parameter path, as well as *, then we want to remove the * option to force least privilege.

Source code in policy_sentry/writing/sid_group.py
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
def remove_actions_duplicated_in_wildcard_arn(self):
    """
    Removes actions from the object that are in a resource-specific ARN, as well as the `*` resource.
    For example, if `ssm:GetParameter` is restricted to a specific parameter path, as well as `*`, then we want to
    remove the `*` option to force least privilege.
    """
    actions_under_wildcard_resources = []
    actions_under_wildcard_resources_to_nuke = []

    # Build a temporary list. Contains actions in MultMultNone SID (where resources = "*")
    for sid in self.sids:
        if self.sids[sid]["arn_format"] == "*":
            actions_under_wildcard_resources.extend(self.sids[sid]["actions"])

    # If the actions under the MultMultNone SID exist under other SIDs
    if len(actions_under_wildcard_resources) > 0:
        for sid in self.sids:
            if "*" not in self.sids[sid]["arn_format"]:
                for action in actions_under_wildcard_resources:
                    if action in self.sids[sid]["actions"]:
                        if action not in self.skip_resource_constraints:
                            # add it to a list of actions to nuke when they are under other SIDs
                            actions_under_wildcard_resources_to_nuke.append(action)

    # If there are actions that we need to remove from SIDs outside of MultMultNone SID
    if len(actions_under_wildcard_resources_to_nuke) > 0:
        for sid in self.sids:
            if "*" in self.sids[sid]["arn_format"]:
                for action in actions_under_wildcard_resources_to_nuke:
                    try:
                        self.sids[sid]["actions"].remove(str(action))
                    except BaseException:  # pylint: disable=broad-except
                        logger.debug("Removal not successful")

remove_actions_not_matching_these(self, actions_to_keep)

Parameters:

Name Type Description Default
actions_to_keep

A list of actions to leave in the policy. All actions not in this list are removed.

required
Source code in policy_sentry/writing/sid_group.py
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
def remove_actions_not_matching_these(self, actions_to_keep):
    """
    Arguments:
        actions_to_keep: A list of actions to leave in the policy. All actions not in this list are removed.
    """
    actions_to_keep = get_lowercase_action_list(actions_to_keep)
    actions_deleted = []
    for sid in self.sids:
        placeholder_actions_list = []
        for action in self.sids[sid]["actions"]:
            # if the action is not in the list of selected actions, don't copy it to the placeholder list
            if action.lower() in actions_to_keep:
                placeholder_actions_list.append(action)
            elif action.lower() not in actions_to_keep:
                logger.debug("%s not found in list of actions to keep: %s", action.lower(), actions_to_keep)
                actions_deleted.append(action)
        # Clear the list and then extend it to include the updated actions only
        self.sids[sid]["actions"].clear()
        self.sids[sid]["actions"].extend(placeholder_actions_list.copy())
    # Highlight the actions that you remove
    logger.debug("Actions deleted: %s", str(actions_deleted))
    # Now that we've removed a bunch of actions, if there are SID groups without any actions,
    # remove them so we don't get SIDs with empty action lists
    self.remove_sids_with_empty_action_lists()

remove_sids_with_empty_action_lists(self)

Now that we've removed a bunch of actions, if there are SID groups without any actions, remove them so we don't get SIDs with empty action lists

Source code in policy_sentry/writing/sid_group.py
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
def remove_sids_with_empty_action_lists(self):
    """
    Now that we've removed a bunch of actions, if there are SID groups without any actions, remove them so we don't get SIDs with empty action lists
    """
    sid_namespaces_to_delete = []
    for sid in self.sids:
        if len(self.sids[sid]["actions"]) > 0:
            pass
        # If the size is zero, add it to the indexes_to_delete list.
        else:
            sid_namespaces_to_delete.append(sid)
    # Loop through sid_namespaces_to_delete in reverse order (so we delete index
    # 10 before index 8, for example)
    if len(sid_namespaces_to_delete) > 0:
        for i in reversed(range(len(sid_namespaces_to_delete))):
            del self.sids[sid_namespaces_to_delete[i]]

create_policy_sid_namespace(service, access_level, resource_type_name, condition_block=None)

Simply generates the SID name. The SID groups ARN types that share an access level.

For example, S3 objects vs. SSM Parameter have different ARN types - as do S3 objects vs S3 buckets. That's how we choose to group them.

Parameters:

Name Type Description Default
service

ssm

required
access_level

Read

required
resource_type_name

parameter

required
condition_block

{"condition_key_string": "ec2:ResourceTag/purpose", "condition_type_string": "StringEquals", "condition_value": "test"}

None

Returns:

Type Description
String

A string like SsmReadParameter

Source code in policy_sentry/writing/sid_group.py
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
def create_policy_sid_namespace(
    service, access_level, resource_type_name, condition_block=None
):
    """
    Simply generates the SID name. The SID groups ARN types that share an access level.

    For example, S3 objects vs. SSM Parameter have different ARN types - as do S3 objects vs S3 buckets. That's how we
    choose to group them.

    Arguments:
        service: `ssm`
        access_level: `Read`
        resource_type_name: `parameter`
        condition_block: `{"condition_key_string": "ec2:ResourceTag/purpose", "condition_type_string": "StringEquals", "condition_value": "test"}`

    Returns:
        String: A string like `SsmReadParameter`
    """
    # Sanitize the resource_type_name; otherwise we hit some list conversion
    # errors
    resource_type_name = re.sub("[^A-Za-z0-9]+", "", resource_type_name)
    # Also remove the space from the Access level, if applicable. This only
    # applies for "Permissions management"
    access_level = re.sub("[^A-Za-z0-9]+", "", access_level)
    sid_namespace_prefix = (
        capitalize_first_character(strip_special_characters(service))
        + capitalize_first_character(access_level)
        + capitalize_first_character(resource_type_name)
    )

    if condition_block:
        condition_key_namespace = re.sub(
            "[^A-Za-z0-9]+", "", condition_block["condition_key_string"]
        )
        condition_type_namespace = condition_block["condition_type_string"]
        condition_value_namespace = re.sub(
            "[^A-Za-z0-9]+", "", condition_block["condition_value"]
        )
        sid_namespace_condition_suffix = (
            f"{capitalize_first_character(condition_key_namespace)}"
            f"{capitalize_first_character(condition_type_namespace)}"
            f"{capitalize_first_character(condition_value_namespace)}"
        )
        sid_namespace = sid_namespace_prefix + sid_namespace_condition_suffix
    else:
        sid_namespace = sid_namespace_prefix
    return sid_namespace

get_wildcard_only_actions_matching_services_and_access_level(services, access_level)

Get a list of wildcard-only actions matching the services and access level

Parameters:

Name Type Description Default
services

A list of AWS services

required
access_level

An access level as it is written in the database, such as 'Read', 'Write', 'List', 'Permissions management', or 'Tagging'

required

Returns:

Type Description
List

A list of wildcard-only actions matching the services and access level

Source code in policy_sentry/writing/sid_group.py
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
def get_wildcard_only_actions_matching_services_and_access_level(services, access_level):
    """
    Get a list of wildcard-only actions matching the services and access level

    Arguments:
        services: A list of AWS services
        access_level: An access level as it is written in the database, such as 'Read', 'Write', 'List', 'Permissions management', or 'Tagging'
    Returns:
        List: A list of wildcard-only actions matching the services and access level
    """
    wildcard_only_actions_to_add = []
    for service in services:
        actions = get_actions_at_access_level_that_support_wildcard_arns_only(
            service, access_level
        )
        wildcard_only_actions_to_add.extend(actions)
    return wildcard_only_actions_to_add

remove_actions_that_are_not_wildcard_arn_only(actions_list)

Given a list of actions, remove the ones that CAN be restricted to ARNs, leaving only the ones that cannot.

Parameters:

Name Type Description Default
actions_list

A list of actions

required

Returns:

Type Description
List

An updated list of actions

Source code in policy_sentry/writing/sid_group.py
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
def remove_actions_that_are_not_wildcard_arn_only(actions_list):
    """
    Given a list of actions, remove the ones that CAN be restricted to ARNs, leaving only the ones that cannot.

    Arguments:
        actions_list: A list of actions
    Returns:
        List: An updated list of actions
    """
    # remove duplicates, if there are any
    actions_list = list(dict.fromkeys(actions_list))
    actions_list_placeholder = []

    for action in actions_list:
        try:
            service_name, action_name = action.split(":")
        except ValueError as v_e:
            # We will skip the action because this likely means that the wildcard action provided is not valid.
            logger.debug(v_e)
            logger.debug(
                "The value provided in wildcard-only section is not formatted properly."
            )
            continue
        rows = get_actions_that_support_wildcard_arns_only(service_name)
        for row in rows:
            if row.lower() == action.lower():
                actions_list_placeholder.append(f"{service_name}:{action_name}")
    return actions_list_placeholder