gcpdiag.queries.cloudrun
Queries related to GCP Cloud Run service.
class
Service(gcpdiag.models.Resource):
29class Service(models.Resource): 30 """Represents Cloud Run service.""" 31 32 _resource_data: dict 33 34 def __init__(self, project_id, resource_data): 35 super().__init__(project_id=project_id) 36 self._resource_data = resource_data 37 38 @property 39 def name(self) -> str: 40 m = re.search(r'/services/([^/]+)$', self._resource_data['name']) 41 if not m: 42 raise RuntimeError("can't determine name of service %s" % (self._resource_data['name'])) 43 return m.group(1) 44 45 @property 46 def id(self) -> str: 47 return self._resource_data['uid'] 48 49 @property 50 def full_path(self) -> str: 51 return self._resource_data['name'] 52 53 @property 54 def conditions(self) -> Dict[str, 'ServiceCondition']: 55 return { 56 condition['type']: ServiceCondition(condition) 57 for condition in self._resource_data['conditions'] 58 } 59 60 @property 61 def short_path(self) -> str: 62 path = self.project_id + '/' + self.id 63 return path
Represents Cloud Run service.
full_path: str
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
conditions: Dict[str, ServiceCondition]
class
ServiceCondition:
66class ServiceCondition: 67 """Represents Cloud Run service status condition.""" 68 69 _resource_data: dict 70 71 def __init__(self, resource_data): 72 self._resource_data = resource_data 73 74 @property 75 def message(self) -> str: 76 return self._resource_data.get('message', '')
Represents Cloud Run service status condition.
def
get_all_locations(project_id: str) -> Iterable[str]:
79def get_all_locations(project_id: str) -> Iterable[str]: 80 """Return list of all regions 81 82 Args: 83 project_id (str): project id for this request 84 85 Raises: 86 utils.GcpApiError: Raises GcpApiError in case of query issues 87 88 Returns: 89 Iterable[Region]: Return list of all regions 90 """ 91 try: 92 cloudrun_api = apis.get_api('run', 'v1', project_id) 93 request = cloudrun_api.projects().locations().list(name=f'projects/{project_id}') 94 response = request.execute(num_retries=config.API_RETRIES) 95 if not response or 'locations' not in response: 96 return set() 97 98 return {location['name'] for location in response['locations'] if 'name' in location} 99 except googleapiclient.errors.HttpError as err: 100 raise utils.GcpApiError(err) from err
Return list of all regions
Arguments:
- project_id (str): project id for this request
Raises:
- utils.GcpApiError: Raises GcpApiError in case of query issues
Returns:
Iterable[Region]: Return list of all regions
@caching.cached_api_call
def
get_services( context: gcpdiag.models.Context) -> Mapping[str, Service]:
103@caching.cached_api_call 104def get_services(context: models.Context) -> Mapping[str, Service]: 105 """Get a list of Cloud Run services matching the given context, 106 indexed by service id.""" 107 services: Dict[str, Service] = {} 108 109 if not apis.is_enabled(context.project_id, 'run'): 110 return services 111 112 locations = get_all_locations(context.project_id) 113 114 for location in locations: 115 m = re.search(r'/locations/([^/]+)$', location) 116 if not m: 117 continue 118 region = m.group(1) 119 cloudrun_api = apis.get_api('run', 'v2', context.project_id) 120 logging.debug( 121 'fetching list of cloud run services in the project %s for the region %s', 122 context.project_id, 123 region, 124 ) 125 query = ( 126 cloudrun_api.projects() 127 .locations() 128 .services() 129 .list(parent=f'projects/{context.project_id}/locations/{region}') 130 ) 131 try: 132 resp = query.execute(num_retries=config.API_RETRIES) 133 if 'services' not in resp: 134 continue 135 for s in resp['services']: 136 # projects/{project}/locations/{location}/services/{serviceId}. 137 result = re.match(r'projects/[^/]+/locations/([^/]+)/services/([^/]+)', s['name']) 138 if not result: 139 logging.error('invalid cloudrun name: %s', s['name']) 140 raise RuntimeError('missing data in projects.locations.services.list response') 141 location = result.group(1) 142 labels = s.get('labels', {}) 143 name = result.group(2) 144 if not context.match_project_resource(location=location, labels=labels, resource=name): 145 continue 146 147 services[s['uid']] = Service(project_id=context.project_id, resource_data=s) 148 except googleapiclient.errors.HttpError as err: 149 raise utils.GcpApiError(err) from err 150 return services
Get a list of Cloud Run services matching the given context, indexed by service id.
153def get_service(project_id: str, region: str, service_name: str) -> Service: 154 cloudrun_api = apis.get_api('run', 'v2', project_id) 155 request = ( 156 cloudrun_api.projects() 157 .locations() 158 .services() 159 .get(name=f'projects/{project_id}/locations/{region}/services/{service_name}') 160 ) 161 response = request.execute(num_retries=config.API_RETRIES) 162 return Service(project_id, response)