Coverage for pyriandx / client.py: 23%
134 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-05 14:38 +1100
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-05 14:38 +1100
1# -*- coding: utf-8 -*-
3# Standard imports
4import json
5import logging
6import ntpath
7import os
8import time
9import requests
10from importlib.resources import files as resource_files
12# Local imports
13from pyriandx.utils import retry_session
15logger = logging.getLogger(__name__)
17__all__ = ['Client']
20class Client:
22 def __init__(self, email, key, institution, base_url, key_is_auth_token=False):
23 self.baseURL = base_url
24 self.headers = {
25 'X-Auth-Email': email,
26 'X-Auth-Institution': institution
27 }
28 if key_is_auth_token:
29 self.headers['X-Auth-Token'] = key
30 else:
31 self.headers['X-Auth-Key'] = key
32 self.template_root = resource_files('pyriandx').joinpath('json')
34 def _load_json_template(self, template_name):
35 template = self.template_root.joinpath(template_name)
36 return json.loads(template.read_text(encoding='utf-8'))
38 def create_case(self, case_data_file):
39 """Creates case with given case data file"""
40 with open(case_data_file, 'r') as f:
41 request_data = json.load(f)
42 logger.debug("Creating case with data:")
43 logger.debug(request_data)
44 response = self._post_api("/case", data=request_data).json()
45 return response['id']
47 def create_sequencer_run(self, accession_number, run_id, request_data=None):
48 """Creates sequencer run with given accession number"""
49 if request_data is None:
50 request_data = self._load_json_template('create_sequencer_run.json')
51 request_data['runId'] = run_id
52 request_data['specimens'][0]['accessionNumber'] = accession_number
53 logger.debug(f"Creating sequencer run with data: {request_data}")
54 response = self._post_api("/sequencerRun", data=request_data).json()
55 return response['id']
57 def create_job(self, case, run_id, request_data=None):
58 """Creates job with given case and sequence run id"""
59 if request_data is None:
60 request_data = self._load_json_template('create_job.json')
61 accession_number = str(case['specimens'][0]['accessionNumber'])
62 request_data['input'][0]['accessionNumber'] = str(accession_number)
63 request_data['input'][0]['sequencerRunInfos'][0]['runId'] = str(run_id)
64 request_data['input'][0]['sequencerRunInfos'][0]['files'] = case['caseFiles']
65 logger.debug(f"Creating job for Case ID: {str(case['id'])} with input payload:")
66 logger.debug(request_data)
67 endpoint = f"/case/{str(case['id'])}/informaticsJobs"
68 response = self._post_api(endpoint, data=request_data).json()
69 return response['jobId']
71 def create_job_vcf(self, case_id):
72 """Creates job with given case id"""
73 logger.debug(f"Creating job for vcf with case ID: {str(case_id)}")
74 endpoint = f"/case/{str(case_id)}/informaticsJobsVCF"
75 self._post_api(endpoint)
77 def get_job_status(self, case_id, job_id):
78 """Gets the status of an informatics job."""
79 logger.debug(f"Getting status of job {job_id} for case ID: {str(case_id)}")
80 case_info = self.get_case_info(case_id)
82 job = None
83 for j in case_info['informaticsJobs']:
84 if j['id'] == job_id:
85 job = j
87 try:
88 status = job['status']
89 except KeyError:
90 status = "NO_JOB_AVAILABLE"
92 logger.debug(f"Got status: {status}")
94 return status
96 def get_report(self, case, output_dir=None):
97 """
98 Downloads report to output_dir.
99 If report_index is None, downloads all reports.
100 If output_dir is None, downloads to working directory.
101 """
102 for report in case['reports']:
103 report_id = report['id']
104 local_filename = f"case-{str(case['id'])}_report-{report_id}.pdf.gz"
105 endpoint = f"/case/{str(case['id'])}/reports/{report_id}?format=pdf"
106 url = self.baseURL + endpoint
108 if output_dir is not None:
109 os.makedirs(output_dir, exist_ok=True)
110 local_filename = output_dir + "/" + local_filename
111 logger.info(f"Found report with ID {report_id}. Downloading as {local_filename} from {url}")
113 response = requests.get(url, stream=True, headers=self.headers)
114 if response.ok:
115 with open(local_filename, 'wb') as f:
116 for chunk in response.iter_content(chunk_size=8192):
117 if chunk:
118 f.write(chunk)
119 else:
120 logger.critical(f"Issue downloading file from {endpoint}. {response.status_code} ::: {response.text}")
122 def upload_file(self, filename, case_id):
123 """Upload file to given accession number"""
124 files = {'file': open(filename, 'rb')}
125 remote_filename = ntpath.basename(filename)
126 endpoint = f"/case/{str(case_id)}/caseFiles/{remote_filename}/"
127 logger.debug(f"Upload file to endpoint: {endpoint}")
128 self._post_api(endpoint, files=files)
130 def get_case_info(self, case_id):
131 """Get case info for given case ID"""
132 return self._get_api(f"/case/{str(case_id)}")
134 def list_cases(self, filters: dict = None):
135 """List cases, optionally apply query filters if pass-in"""
136 return self._get_api(f"/case", params=filters)
138 def get(self, endpoint, params):
139 """Expose GET API to caller/client
140 Retain _get_api() for backward compatible
141 """
142 return self._get_api(endpoint=endpoint, params=params)
144 def _get_api(self, endpoint, params=None):
145 """Internal function to create get requests to the API"""
146 url = self.baseURL + endpoint
147 logger.debug(f"Sending request to {url}")
148 t0 = time.time()
149 try:
150 response = retry_session(self.headers).get(url, params=params)
151 except Exception as x:
152 logger.critical(f"{endpoint} call failed after retry: {x}")
153 else:
154 if response.status_code == 200:
155 logger.debug(f"Call to {endpoint} succeeded")
156 return json.loads(response.text)
157 else:
158 logger.critical(f"Call to {endpoint} failed. Error code was {response.status_code}")
159 logger.critical(f"Server responded with {response.text}")
161 finally:
162 t1 = time.time()
163 logger.debug(f"Call took {t1 - t0} seconds")
165 def post(self, endpoint, data=None, files=None):
166 """Expose POST API to caller/client
167 Retain _post_api() for backward compatible
168 """
169 return self._post_api(endpoint, data=data, files=files)
171 def _post_api(self, endpoint, data=None, files=None):
172 """Internal function to create post requests to the API"""
173 url = self.baseURL + endpoint
174 logger.debug(f"Sending request to {url}")
175 t0 = time.time()
176 post_headers = self.headers.copy()
177 if data is not None:
178 post_headers['Content-Type'] = "application/json"
179 post_headers['Accept-Encoding'] = "*"
180 if files is None:
181 upload = False
182 else:
183 upload = True # prevents seeing every. single. byte. in file upload during verbose mode
184 # uncomment if you like to inspect the json data we're sending
185 # with open("request-debug.json", "w") as data_file:
186 # json.dump(data, data_file, indent=2)
187 try:
188 response = retry_session(post_headers, upload=upload).post(url, json=data, files=files)
189 except Exception as x:
190 logger.critical(f"{endpoint} call failed after retry: {x}")
191 else:
192 if response.status_code == 200:
193 logger.debug(f"Call to {endpoint} succeeded")
194 logger.debug(f"Response was {response.text}")
195 else:
196 logger.critical(f"Call to {endpoint} failed. Error code was {response.status_code}")
197 logger.critical(f"Server responded with {response.text}")
198 return response
199 finally:
200 t1 = time.time()
201 logger.debug(f"Call took {t1 - t0} seconds")