gcpdiag.queries.gae
Queries related to GCP App Engine Standard app.
class
Service(gcpdiag.models.Resource):
29class Service(models.Resource): 30 """Represents an App Engine Standard app service.""" 31 32 _resource_data: dict 33 34 def __init__(self, project_id, resource_data): 35 super().__init__(project_id=project_id) 36 self._resource_data = resource_data 37 38 @property 39 def name(self) -> str: 40 m = re.search(r'/services/([^/]+)$', self._resource_data['name']) 41 if not m: 42 raise RuntimeError("can't determine name of service %s" % (self._resource_data['name'])) 43 return m.group(1) 44 45 @property 46 def id(self) -> str: 47 return self._resource_data['id'] 48 49 @property 50 def full_path(self) -> str: 51 return self._resource_data['name'] 52 53 @property 54 def short_path(self) -> str: 55 path = self.project_id + '/' + self.id 56 return path
Represents an App Engine Standard app service.
class
Version(gcpdiag.models.Resource):
59class Version(models.Resource): 60 """Represents an App Engine Standard app version.""" 61 62 _resource_data: dict 63 service: Service 64 65 def __init__(self, project_id, resource_data): 66 super().__init__(project_id=project_id) 67 self._resource_data = resource_data 68 69 @property 70 def id(self) -> str: 71 return self._resource_data['id'] 72 73 @property 74 def full_path(self) -> str: 75 return self._resource_data['name'] 76 77 @property 78 def short_path(self) -> str: 79 path = self.project_id + '/' + self.id 80 return path 81 82 @property 83 def runtime(self) -> str: 84 return self._resource_data['runtime'] 85 86 @property 87 def env(self) -> str: 88 return self._resource_data['env']
Represents an App Engine Standard app version.
service: Service
full_path: str
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
@caching.cached_api_call
def
get_services( context: gcpdiag.models.Context) -> Mapping[str, Service]:
91@caching.cached_api_call 92def get_services(context: models.Context) -> Mapping[str, Service]: 93 """Get a list of App Engine Standard services matching the given context, 94 indexed by service id.""" 95 services: Dict[str, Service] = {} 96 if not apis.is_enabled(context.project_id, 'appengine'): 97 return services 98 appengine_api = apis.get_api('appengine', 'v1', context.project_id) 99 logging.debug('fetching list of app engine services in the project %s', context.project_id) 100 query = appengine_api.apps().services().list(appsId=context.project_id) 101 try: 102 resp = query.execute(num_retries=config.API_RETRIES) 103 if 'services' not in resp: 104 return services 105 for s in resp['services']: 106 # apps/myapp/services/default 107 result = re.match(r'apps/[^/]+/services/([^/]+)', s['name']) 108 if not result: 109 logging.error('invalid appengine data: %s', s['name']) 110 continue 111 112 labels = s.get('labels', {}) 113 114 if not context.match_project_resource(resource=result.group(1), labels=labels): 115 continue 116 117 services[s['id']] = Service(project_id=context.project_id, resource_data=s) 118 except googleapiclient.errors.HttpError as err: 119 raise utils.GcpApiError(err) from err 120 return services
Get a list of App Engine Standard services matching the given context, indexed by service id.
@caching.cached_api_call
def
get_versions( context: gcpdiag.models.Context) -> Mapping[str, Version]:
123@caching.cached_api_call 124def get_versions(context: models.Context) -> Mapping[str, Version]: 125 """Get a list of App Engine Standard service versions the given context, 126 indexed by a version id.""" 127 versions: Dict[str, Version] = {} 128 if not apis.is_enabled(context.project_id, 'appengine'): 129 return versions 130 131 appengine_api = apis.get_api('appengine', 'v1', context.project_id) 132 133 services = get_services(context) 134 135 for service in services.values(): 136 query = ( 137 appengine_api.apps() 138 .services() 139 .versions() 140 .list(appsId=context.project_id, servicesId=service.id) 141 ) 142 try: 143 resp = query.execute(num_retries=config.API_RETRIES) 144 if 'versions' not in resp: 145 return versions 146 for resp_s in resp['versions']: 147 # verify that we have some minimal data that we expect 148 if 'id' not in resp_s: 149 raise RuntimeError('missing data in apps.services.list response') 150 v = Version(project_id=context.project_id, resource_data=resp_s) 151 v.service = service 152 versions[v.id] = v 153 except googleapiclient.errors.HttpError as err: 154 raise utils.GcpApiError(err) from err 155 156 logging.debug('fetching list of app engine services in the project %s', context.project_id) 157 158 return versions
Get a list of App Engine Standard service versions the given context, indexed by a version id.