Source code for SciServer.Dask

import requests
from requests.exceptions import HTTPError
from base64 import b64decode
from pathlib import Path
from dask.distributed import Client
from distributed.security import Security
from os.path import expanduser
import SciServer.Authentication
import SciServer.Config
import json

[docs]def getClient(ref_id=None): """ Creates a new client for the specified Dask cluster. If `ref_id` is not set, cluster connection properties will be read from the ``~/dask-cluster.json`` file injected into new SciServer Compute containers automatically when they are created with an attached Dask cluster. :param ref_id: Dask cluster reference ID in SciServer Compute, defaults to `None` :return: Dask client object :rtype: :class:`dask.distributed.Client` """ token = SciServer.Authentication.getToken() data = None if (ref_id is None): with open(expanduser('~/dask-cluster.json')) as f: data = json.load(f) else: try: response = requests.get(''.join([SciServer.Config.ComputeUrl.rstrip('/'), '/api/dask/clusters/', ref_id]), params = {'connectionInfo': 'true'}, headers = {'X-Auth-Token': token}) response.raise_for_status() data = response.json()['connection'] except HTTPError as err: try: msg = str(err) + '\n' + response.json()['error']['stackTrace'] except: raise err raise HTTPError(msg) base_dir = expanduser('~/.sciserver/dask') Path(base_dir).mkdir(parents=True, exist_ok=True) with open(base_dir + '/ca.pem', 'w+b') as f: f.write(b64decode(data['ca'])) with open(base_dir + '/client-cert.pem', 'w+b') as f: f.write(b64decode(data['clientCert'])) with open(base_dir + '/client-key.pem', 'w+b') as f: f.write(b64decode(data['clientKey'])) scheduler_url = data['schedulerUrl'] client = Client( scheduler_url, security = Security(tls_ca_file = base_dir + '/ca.pem', tls_client_cert = base_dir + '/client-cert.pem', tls_client_key = base_dir + '/client-key.pem', require_encryption=True)) return client