Coverage for pyriandx/client.py: 23%

132 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-08-30 13:27 +1000

1# -*- coding: utf-8 -*- 

2import json 

3import logging 

4import ntpath 

5import os 

6import shutil 

7import time 

8 

9import pkg_resources 

10import requests 

11 

12from pyriandx.utils import retry_session 

13 

14logger = logging.getLogger(__name__) 

15 

16__all__ = ['Client'] 

17 

18 

19class Client: 

20 

21 def __init__(self, email, key, institution, base_url, key_is_auth_token=False): 

22 self.baseURL = base_url 

23 self.headers = { 

24 'X-Auth-Email': email, 

25 'X-Auth-Institution': institution 

26 } 

27 if key_is_auth_token: 

28 self.headers['X-Auth-Token'] = key 

29 else: 

30 self.headers['X-Auth-Key'] = key 

31 # this is required to import static data from the module 

32 self.data_path = pkg_resources.resource_filename('pyriandx', 'json/') 

33 

34 def create_case(self, case_data_file): 

35 """Creates case with given case data file""" 

36 with open(case_data_file, 'r') as f: 

37 request_data = json.load(f) 

38 logger.debug("Creating case with data:") 

39 logger.debug(request_data) 

40 response = self._post_api("/case", data=request_data).json() 

41 return response['id'] 

42 

43 def create_sequencer_run(self, accession_number, run_id, request_data=None): 

44 """Creates sequencer run with given accession number""" 

45 if request_data is None: 

46 with open(self.data_path + 'create_sequencer_run.json', 'r') as f: 

47 request_data = json.load(f) 

48 request_data['runId'] = run_id 

49 request_data['specimens'][0]['accessionNumber'] = accession_number 

50 logger.debug(f"Creating sequencer run with data: {request_data}") 

51 response = self._post_api("/sequencerRun", data=request_data).json() 

52 return response['id'] 

53 

54 def create_job(self, case, run_id, request_data=None): 

55 """Creates job with given case and sequence run id""" 

56 if request_data is None: 

57 with open(self.data_path + 'create_job.json', 'r') as f: 

58 request_data: dict = json.load(f) 

59 accession_number = str(case['specimens'][0]['accessionNumber']) 

60 request_data['input'][0]['accessionNumber'] = str(accession_number) 

61 request_data['input'][0]['sequencerRunInfos'][0]['runId'] = str(run_id) 

62 request_data['input'][0]['sequencerRunInfos'][0]['files'] = case['caseFiles'] 

63 logger.debug(f"Creating job for Case ID: {str(case['id'])} with input payload:") 

64 logger.debug(request_data) 

65 endpoint = f"/case/{str(case['id'])}/informaticsJobs" 

66 response = self._post_api(endpoint, data=request_data).json() 

67 return response['jobId'] 

68 

69 def create_job_vcf(self, case_id): 

70 """Creates job with given case id""" 

71 logger.debug(f"Creating job for vcf with case ID: {str(case_id)}") 

72 endpoint = f"/case/{str(case_id)}/informaticsJobsVCF" 

73 self._post_api(endpoint) 

74 

75 def get_job_status(self, case_id, job_id): 

76 """Gets the status of an informatics job.""" 

77 logger.debug(f"Getting status of job {job_id} for case ID: {str(case_id)}") 

78 case_info = self.get_case_info(case_id) 

79 

80 job = None 

81 for j in case_info['informaticsJobs']: 

82 if j['id'] == job_id: 

83 job = j 

84 

85 try: 

86 status = job['status'] 

87 except KeyError: 

88 status = "NO_JOB_AVAILABLE" 

89 

90 logger.debug(f"Got status: {status}") 

91 

92 return status 

93 

94 def get_report(self, case, output_dir=None): 

95 """ 

96 Downloads report to output_dir. 

97 If report_index is None, downloads all reports. 

98 If output_dir is None, downloads to working directory. 

99 """ 

100 for r in case['reports']: 

101 report_id = r['id'] 

102 local_filename = f"case-{str(case['id'])}_report-{report_id}.pdf.gz" 

103 endpoint = f"/case/{str(case['id'])}/reports/{report_id}?format=pdf" 

104 url = self.baseURL + endpoint 

105 

106 if output_dir is not None: 

107 os.makedirs(output_dir, exist_ok=True) 

108 local_filename = output_dir + "/" + local_filename 

109 logger.info(f"Found report with ID {report_id}. Downloading as {local_filename} from {url}") 

110 

111 r = requests.get(url, stream=True, headers=self.headers) 

112 if r.ok: 

113 with open(local_filename, 'wb') as f: 

114 shutil.copyfileobj(r.raw, f) 

115 else: 

116 logger.critical(f"Issue downloading file from {endpoint}. {r.status_code} ::: {r.text}") 

117 

118 def upload_file(self, filename, case_id): 

119 """Upload file to given accession number""" 

120 files = {'file': open(filename, 'rb')} 

121 remote_filename = ntpath.basename(filename) 

122 endpoint = f"/case/{str(case_id)}/caseFiles/{remote_filename}/" 

123 logger.debug(f"Upload file to endpoint: {endpoint}") 

124 self._post_api(endpoint, files=files) 

125 

126 def get_case_info(self, case_id): 

127 """Get case info for given case ID""" 

128 return self._get_api(f"/case/{str(case_id)}") 

129 

130 def list_cases(self, filters: dict = None): 

131 """List cases, optionally apply query filters if pass-in""" 

132 return self._get_api(f"/case", params=filters) 

133 

134 def get(self, endpoint, params): 

135 """Expose GET API to caller/client 

136 Retain _get_api() for backward compatible 

137 """ 

138 return self._get_api(endpoint=endpoint, params=params) 

139 

140 def _get_api(self, endpoint, params=None): 

141 """Internal function to create get requests to the API""" 

142 url = self.baseURL + endpoint 

143 logger.debug(f"Sending request to {url}") 

144 t0 = time.time() 

145 try: 

146 response = retry_session(self.headers).get(url, params=params) 

147 except Exception as x: 

148 logger.critical(f"{endpoint} call failed after retry: {x}") 

149 else: 

150 if response.status_code == 200: 

151 logger.debug(f"Call to {endpoint} succeeded") 

152 return json.loads(response.text) 

153 else: 

154 logger.critical(f"Call to {endpoint} failed. Error code was {response.status_code}") 

155 logger.critical(f"Server responded with {response.text}") 

156 

157 finally: 

158 t1 = time.time() 

159 logger.debug(f"Call took {t1 - t0} seconds") 

160 

161 def post(self, endpoint, data=None, files=None): 

162 """Expose POST API to caller/client 

163 Retain _post_api() for backward compatible 

164 """ 

165 return self._post_api(endpoint, data=data, files=files) 

166 

167 def _post_api(self, endpoint, data=None, files=None): 

168 """Internal function to create post requests to the API""" 

169 url = self.baseURL + endpoint 

170 logger.debug(f"Sending request to {url}") 

171 t0 = time.time() 

172 post_headers = self.headers.copy() 

173 if data is not None: 

174 post_headers['Content-Type'] = "application/json" 

175 post_headers['Accept-Encoding'] = "*" 

176 if files is None: 

177 upload = False 

178 else: 

179 upload = True # prevents seeing every. single. byte. in file upload during verbose mode 

180 # uncomment if you like to inspect the json data we're sending 

181 # with open("request-debug.json", "w") as data_file: 

182 # json.dump(data, data_file, indent=2) 

183 try: 

184 response = retry_session(post_headers, upload=upload).post(url, json=data, files=files) 

185 except Exception as x: 

186 logger.critical(f"{endpoint} call failed after retry: {x}") 

187 else: 

188 if response.status_code == 200: 

189 logger.debug(f"Call to {endpoint} succeeded") 

190 logger.debug(f"Response was {response.text}") 

191 else: 

192 logger.critical(f"Call to {endpoint} failed. Error code was {response.status_code}") 

193 logger.critical(f"Server responded with {response.text}") 

194 return response 

195 finally: 

196 t1 = time.time() 

197 logger.debug(f"Call took {t1 - t0} seconds")