Coverage for pyriandx / client.py: 23%

134 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-05 14:38 +1100

1# -*- coding: utf-8 -*- 

2 

3# Standard imports 

4import json 

5import logging 

6import ntpath 

7import os 

8import time 

9import requests 

10from importlib.resources import files as resource_files 

11 

12# Local imports 

13from pyriandx.utils import retry_session 

14 

15logger = logging.getLogger(__name__) 

16 

17__all__ = ['Client'] 

18 

19 

20class Client: 

21 

22 def __init__(self, email, key, institution, base_url, key_is_auth_token=False): 

23 self.baseURL = base_url 

24 self.headers = { 

25 'X-Auth-Email': email, 

26 'X-Auth-Institution': institution 

27 } 

28 if key_is_auth_token: 

29 self.headers['X-Auth-Token'] = key 

30 else: 

31 self.headers['X-Auth-Key'] = key 

32 self.template_root = resource_files('pyriandx').joinpath('json') 

33 

34 def _load_json_template(self, template_name): 

35 template = self.template_root.joinpath(template_name) 

36 return json.loads(template.read_text(encoding='utf-8')) 

37 

38 def create_case(self, case_data_file): 

39 """Creates case with given case data file""" 

40 with open(case_data_file, 'r') as f: 

41 request_data = json.load(f) 

42 logger.debug("Creating case with data:") 

43 logger.debug(request_data) 

44 response = self._post_api("/case", data=request_data).json() 

45 return response['id'] 

46 

47 def create_sequencer_run(self, accession_number, run_id, request_data=None): 

48 """Creates sequencer run with given accession number""" 

49 if request_data is None: 

50 request_data = self._load_json_template('create_sequencer_run.json') 

51 request_data['runId'] = run_id 

52 request_data['specimens'][0]['accessionNumber'] = accession_number 

53 logger.debug(f"Creating sequencer run with data: {request_data}") 

54 response = self._post_api("/sequencerRun", data=request_data).json() 

55 return response['id'] 

56 

57 def create_job(self, case, run_id, request_data=None): 

58 """Creates job with given case and sequence run id""" 

59 if request_data is None: 

60 request_data = self._load_json_template('create_job.json') 

61 accession_number = str(case['specimens'][0]['accessionNumber']) 

62 request_data['input'][0]['accessionNumber'] = str(accession_number) 

63 request_data['input'][0]['sequencerRunInfos'][0]['runId'] = str(run_id) 

64 request_data['input'][0]['sequencerRunInfos'][0]['files'] = case['caseFiles'] 

65 logger.debug(f"Creating job for Case ID: {str(case['id'])} with input payload:") 

66 logger.debug(request_data) 

67 endpoint = f"/case/{str(case['id'])}/informaticsJobs" 

68 response = self._post_api(endpoint, data=request_data).json() 

69 return response['jobId'] 

70 

71 def create_job_vcf(self, case_id): 

72 """Creates job with given case id""" 

73 logger.debug(f"Creating job for vcf with case ID: {str(case_id)}") 

74 endpoint = f"/case/{str(case_id)}/informaticsJobsVCF" 

75 self._post_api(endpoint) 

76 

77 def get_job_status(self, case_id, job_id): 

78 """Gets the status of an informatics job.""" 

79 logger.debug(f"Getting status of job {job_id} for case ID: {str(case_id)}") 

80 case_info = self.get_case_info(case_id) 

81 

82 job = None 

83 for j in case_info['informaticsJobs']: 

84 if j['id'] == job_id: 

85 job = j 

86 

87 try: 

88 status = job['status'] 

89 except KeyError: 

90 status = "NO_JOB_AVAILABLE" 

91 

92 logger.debug(f"Got status: {status}") 

93 

94 return status 

95 

96 def get_report(self, case, output_dir=None): 

97 """ 

98 Downloads report to output_dir. 

99 If report_index is None, downloads all reports. 

100 If output_dir is None, downloads to working directory. 

101 """ 

102 for report in case['reports']: 

103 report_id = report['id'] 

104 local_filename = f"case-{str(case['id'])}_report-{report_id}.pdf.gz" 

105 endpoint = f"/case/{str(case['id'])}/reports/{report_id}?format=pdf" 

106 url = self.baseURL + endpoint 

107 

108 if output_dir is not None: 

109 os.makedirs(output_dir, exist_ok=True) 

110 local_filename = output_dir + "/" + local_filename 

111 logger.info(f"Found report with ID {report_id}. Downloading as {local_filename} from {url}") 

112 

113 response = requests.get(url, stream=True, headers=self.headers) 

114 if response.ok: 

115 with open(local_filename, 'wb') as f: 

116 for chunk in response.iter_content(chunk_size=8192): 

117 if chunk: 

118 f.write(chunk) 

119 else: 

120 logger.critical(f"Issue downloading file from {endpoint}. {response.status_code} ::: {response.text}") 

121 

122 def upload_file(self, filename, case_id): 

123 """Upload file to given accession number""" 

124 files = {'file': open(filename, 'rb')} 

125 remote_filename = ntpath.basename(filename) 

126 endpoint = f"/case/{str(case_id)}/caseFiles/{remote_filename}/" 

127 logger.debug(f"Upload file to endpoint: {endpoint}") 

128 self._post_api(endpoint, files=files) 

129 

130 def get_case_info(self, case_id): 

131 """Get case info for given case ID""" 

132 return self._get_api(f"/case/{str(case_id)}") 

133 

134 def list_cases(self, filters: dict = None): 

135 """List cases, optionally apply query filters if pass-in""" 

136 return self._get_api(f"/case", params=filters) 

137 

138 def get(self, endpoint, params): 

139 """Expose GET API to caller/client 

140 Retain _get_api() for backward compatible 

141 """ 

142 return self._get_api(endpoint=endpoint, params=params) 

143 

144 def _get_api(self, endpoint, params=None): 

145 """Internal function to create get requests to the API""" 

146 url = self.baseURL + endpoint 

147 logger.debug(f"Sending request to {url}") 

148 t0 = time.time() 

149 try: 

150 response = retry_session(self.headers).get(url, params=params) 

151 except Exception as x: 

152 logger.critical(f"{endpoint} call failed after retry: {x}") 

153 else: 

154 if response.status_code == 200: 

155 logger.debug(f"Call to {endpoint} succeeded") 

156 return json.loads(response.text) 

157 else: 

158 logger.critical(f"Call to {endpoint} failed. Error code was {response.status_code}") 

159 logger.critical(f"Server responded with {response.text}") 

160 

161 finally: 

162 t1 = time.time() 

163 logger.debug(f"Call took {t1 - t0} seconds") 

164 

165 def post(self, endpoint, data=None, files=None): 

166 """Expose POST API to caller/client 

167 Retain _post_api() for backward compatible 

168 """ 

169 return self._post_api(endpoint, data=data, files=files) 

170 

171 def _post_api(self, endpoint, data=None, files=None): 

172 """Internal function to create post requests to the API""" 

173 url = self.baseURL + endpoint 

174 logger.debug(f"Sending request to {url}") 

175 t0 = time.time() 

176 post_headers = self.headers.copy() 

177 if data is not None: 

178 post_headers['Content-Type'] = "application/json" 

179 post_headers['Accept-Encoding'] = "*" 

180 if files is None: 

181 upload = False 

182 else: 

183 upload = True # prevents seeing every. single. byte. in file upload during verbose mode 

184 # uncomment if you like to inspect the json data we're sending 

185 # with open("request-debug.json", "w") as data_file: 

186 # json.dump(data, data_file, indent=2) 

187 try: 

188 response = retry_session(post_headers, upload=upload).post(url, json=data, files=files) 

189 except Exception as x: 

190 logger.critical(f"{endpoint} call failed after retry: {x}") 

191 else: 

192 if response.status_code == 200: 

193 logger.debug(f"Call to {endpoint} succeeded") 

194 logger.debug(f"Response was {response.text}") 

195 else: 

196 logger.critical(f"Call to {endpoint} failed. Error code was {response.status_code}") 

197 logger.critical(f"Server responded with {response.text}") 

198 return response 

199 finally: 

200 t1 = time.time() 

201 logger.debug(f"Call took {t1 - t0} seconds")