Coverage for pyriandx/client.py: 23%
132 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-08-30 13:27 +1000
« prev ^ index » next coverage.py v6.4.2, created at 2022-08-30 13:27 +1000
1# -*- coding: utf-8 -*-
2import json
3import logging
4import ntpath
5import os
6import shutil
7import time
9import pkg_resources
10import requests
12from pyriandx.utils import retry_session
14logger = logging.getLogger(__name__)
16__all__ = ['Client']
19class Client:
21 def __init__(self, email, key, institution, base_url, key_is_auth_token=False):
22 self.baseURL = base_url
23 self.headers = {
24 'X-Auth-Email': email,
25 'X-Auth-Institution': institution
26 }
27 if key_is_auth_token:
28 self.headers['X-Auth-Token'] = key
29 else:
30 self.headers['X-Auth-Key'] = key
31 # this is required to import static data from the module
32 self.data_path = pkg_resources.resource_filename('pyriandx', 'json/')
34 def create_case(self, case_data_file):
35 """Creates case with given case data file"""
36 with open(case_data_file, 'r') as f:
37 request_data = json.load(f)
38 logger.debug("Creating case with data:")
39 logger.debug(request_data)
40 response = self._post_api("/case", data=request_data).json()
41 return response['id']
43 def create_sequencer_run(self, accession_number, run_id, request_data=None):
44 """Creates sequencer run with given accession number"""
45 if request_data is None:
46 with open(self.data_path + 'create_sequencer_run.json', 'r') as f:
47 request_data = json.load(f)
48 request_data['runId'] = run_id
49 request_data['specimens'][0]['accessionNumber'] = accession_number
50 logger.debug(f"Creating sequencer run with data: {request_data}")
51 response = self._post_api("/sequencerRun", data=request_data).json()
52 return response['id']
54 def create_job(self, case, run_id, request_data=None):
55 """Creates job with given case and sequence run id"""
56 if request_data is None:
57 with open(self.data_path + 'create_job.json', 'r') as f:
58 request_data: dict = json.load(f)
59 accession_number = str(case['specimens'][0]['accessionNumber'])
60 request_data['input'][0]['accessionNumber'] = str(accession_number)
61 request_data['input'][0]['sequencerRunInfos'][0]['runId'] = str(run_id)
62 request_data['input'][0]['sequencerRunInfos'][0]['files'] = case['caseFiles']
63 logger.debug(f"Creating job for Case ID: {str(case['id'])} with input payload:")
64 logger.debug(request_data)
65 endpoint = f"/case/{str(case['id'])}/informaticsJobs"
66 response = self._post_api(endpoint, data=request_data).json()
67 return response['jobId']
69 def create_job_vcf(self, case_id):
70 """Creates job with given case id"""
71 logger.debug(f"Creating job for vcf with case ID: {str(case_id)}")
72 endpoint = f"/case/{str(case_id)}/informaticsJobsVCF"
73 self._post_api(endpoint)
75 def get_job_status(self, case_id, job_id):
76 """Gets the status of an informatics job."""
77 logger.debug(f"Getting status of job {job_id} for case ID: {str(case_id)}")
78 case_info = self.get_case_info(case_id)
80 job = None
81 for j in case_info['informaticsJobs']:
82 if j['id'] == job_id:
83 job = j
85 try:
86 status = job['status']
87 except KeyError:
88 status = "NO_JOB_AVAILABLE"
90 logger.debug(f"Got status: {status}")
92 return status
94 def get_report(self, case, output_dir=None):
95 """
96 Downloads report to output_dir.
97 If report_index is None, downloads all reports.
98 If output_dir is None, downloads to working directory.
99 """
100 for r in case['reports']:
101 report_id = r['id']
102 local_filename = f"case-{str(case['id'])}_report-{report_id}.pdf.gz"
103 endpoint = f"/case/{str(case['id'])}/reports/{report_id}?format=pdf"
104 url = self.baseURL + endpoint
106 if output_dir is not None:
107 os.makedirs(output_dir, exist_ok=True)
108 local_filename = output_dir + "/" + local_filename
109 logger.info(f"Found report with ID {report_id}. Downloading as {local_filename} from {url}")
111 r = requests.get(url, stream=True, headers=self.headers)
112 if r.ok:
113 with open(local_filename, 'wb') as f:
114 shutil.copyfileobj(r.raw, f)
115 else:
116 logger.critical(f"Issue downloading file from {endpoint}. {r.status_code} ::: {r.text}")
118 def upload_file(self, filename, case_id):
119 """Upload file to given accession number"""
120 files = {'file': open(filename, 'rb')}
121 remote_filename = ntpath.basename(filename)
122 endpoint = f"/case/{str(case_id)}/caseFiles/{remote_filename}/"
123 logger.debug(f"Upload file to endpoint: {endpoint}")
124 self._post_api(endpoint, files=files)
126 def get_case_info(self, case_id):
127 """Get case info for given case ID"""
128 return self._get_api(f"/case/{str(case_id)}")
130 def list_cases(self, filters: dict = None):
131 """List cases, optionally apply query filters if pass-in"""
132 return self._get_api(f"/case", params=filters)
134 def get(self, endpoint, params):
135 """Expose GET API to caller/client
136 Retain _get_api() for backward compatible
137 """
138 return self._get_api(endpoint=endpoint, params=params)
140 def _get_api(self, endpoint, params=None):
141 """Internal function to create get requests to the API"""
142 url = self.baseURL + endpoint
143 logger.debug(f"Sending request to {url}")
144 t0 = time.time()
145 try:
146 response = retry_session(self.headers).get(url, params=params)
147 except Exception as x:
148 logger.critical(f"{endpoint} call failed after retry: {x}")
149 else:
150 if response.status_code == 200:
151 logger.debug(f"Call to {endpoint} succeeded")
152 return json.loads(response.text)
153 else:
154 logger.critical(f"Call to {endpoint} failed. Error code was {response.status_code}")
155 logger.critical(f"Server responded with {response.text}")
157 finally:
158 t1 = time.time()
159 logger.debug(f"Call took {t1 - t0} seconds")
161 def post(self, endpoint, data=None, files=None):
162 """Expose POST API to caller/client
163 Retain _post_api() for backward compatible
164 """
165 return self._post_api(endpoint, data=data, files=files)
167 def _post_api(self, endpoint, data=None, files=None):
168 """Internal function to create post requests to the API"""
169 url = self.baseURL + endpoint
170 logger.debug(f"Sending request to {url}")
171 t0 = time.time()
172 post_headers = self.headers.copy()
173 if data is not None:
174 post_headers['Content-Type'] = "application/json"
175 post_headers['Accept-Encoding'] = "*"
176 if files is None:
177 upload = False
178 else:
179 upload = True # prevents seeing every. single. byte. in file upload during verbose mode
180 # uncomment if you like to inspect the json data we're sending
181 # with open("request-debug.json", "w") as data_file:
182 # json.dump(data, data_file, indent=2)
183 try:
184 response = retry_session(post_headers, upload=upload).post(url, json=data, files=files)
185 except Exception as x:
186 logger.critical(f"{endpoint} call failed after retry: {x}")
187 else:
188 if response.status_code == 200:
189 logger.debug(f"Call to {endpoint} succeeded")
190 logger.debug(f"Response was {response.text}")
191 else:
192 logger.critical(f"Call to {endpoint} failed. Error code was {response.status_code}")
193 logger.critical(f"Server responded with {response.text}")
194 return response
195 finally:
196 t1 = time.time()
197 logger.debug(f"Call took {t1 - t0} seconds")