gcpdiag.queries.artifact_registry

Queries related to GCP Artifact Registry
class ArtifactRegistryIAMPolicy(gcpdiag.queries.iam.BaseIAMPolicy):
25class ArtifactRegistryIAMPolicy(iam.BaseIAMPolicy):
26  def _is_resource_permission(self, permission):
27    return True

Common class for IAM policies

@dataclasses.dataclass
class ProjectSettings:
30@dataclasses.dataclass
31class ProjectSettings:
32  legacy_redirect: bool
ProjectSettings(legacy_redirect: bool)
legacy_redirect: bool
@caching.cached_api_call(in_memory=True)
def get_registry_iam_policy( context: gcpdiag.models.Context, location: str, registry_name: str) -> ArtifactRegistryIAMPolicy:
35@caching.cached_api_call(in_memory=True)
36def get_registry_iam_policy(
37  context: models.Context, location: str, registry_name: str
38) -> ArtifactRegistryIAMPolicy:
39  project_id = context.project_id
40  ar_api = apis.get_api('artifactregistry', 'v1', project_id)
41  registry_id = 'projects/{}/locations/{}/repositories/{}'.format(
42    project_id, location, registry_name
43  )
44  request = ar_api.projects().locations().repositories().getIamPolicy(resource=registry_id)
45  return iam.fetch_iam_policy(request, ArtifactRegistryIAMPolicy, project_id, registry_id, context)
@caching.cached_api_call(in_memory=True)
def get_project_settings(project_id: str) -> ProjectSettings:
48@caching.cached_api_call(in_memory=True)
49def get_project_settings(project_id: str) -> ProjectSettings:
50  ar_api = apis.get_api('artifactregistry', 'v1', project_id)
51  response = (
52    ar_api.projects()
53    .getProjectSettings(name=f'projects/{project_id}/projectSettings')
54    .execute(num_retries=config.API_RETRIES)
55  )
56  return ProjectSettings(
57    legacy_redirect=response.get('legacyRedirectionState') == 'REDIRECTION_FROM_GCR_IO_ENABLED'
58  )