import io
import time
from typing import TYPE_CHECKING, Any, Dict, Union, List, Optional
from warnings import warn
from pandas import DataFrame, concat
from .base_deployment import BaseDeployment
from ..helpers import DataConnection
from ..utils import StatusLogger, print_text_header_h1
from ..utils.autoai.utils import init_cos_client, try_load_dataset
from ..utils.autoai.watson_studio import change_client_get_headers
from ..utils.deployment.errors import BatchJobFailed, MissingScoringResults
from ..wml_client_error import WMLClientError
from ..workspace import WorkSpace
from ..utils.autoai.connection import validate_source_data_connections, validate_deployment_output_connection
if TYPE_CHECKING:
from sklearn.pipeline import Pipeline
from pandas import DataFrame
from numpy import ndarray
__all__ = [
"Batch"
]
[docs]class Batch(BaseDeployment):
"""
The Batch Deployment class.
With this class object you can manage any batch deployment.
Parameters
----------
source_wml_credentials: dictionary, required
Credentials to Watson Machine Learning instance where training was performed.
source_project_id: str, optional
ID of the Watson Studio project where training was performed.
source_space_id: str, optional
ID of the Watson Studio Space where training was performed.
target_wml_credentials: dictionary, required
Credentials to Watson Machine Learning instance where you want to deploy.
target_project_id: str, optional
ID of the Watson Studio project where you want to deploy.
target_space_id: str, optional
ID of the Watson Studio Space where you want to deploy.
"""
def __init__(self,
source_wml_credentials: Union[dict, 'WorkSpace'] = None,
source_project_id: str = None,
source_space_id: str = None,
target_wml_credentials: Union[dict, 'WorkSpace'] = None,
target_project_id: str = None,
target_space_id: str = None,
wml_credentials: Union[dict, 'WorkSpace'] = None,
project_id: str = None,
space_id: str = None):
# note: backward compatibility
if wml_credentials is not None:
source_wml_credentials = wml_credentials
warn("\"wml_credentials\" parameter is depreciated, please use \"source_wml_credentials\"")
print("\"wml_credentials\" parameter is depreciated, please use \"source_wml_credentials\"")
if project_id is not None:
source_project_id = project_id
warn("\"project_id\" parameter is depreciated, please use \"source_project_id\"")
print("\"project_id\" parameter is depreciated, please use \"source_project_id\"")
if space_id is not None:
source_space_id = space_id
warn("\"space_id\" parameter is depreciated, please use \"source_space_id\"")
print("\"space_id\" parameter is depreciated, please use \"source_space_id\"")
# --- end note
# note: as workspace is not clear enough to understand, there is a possibility to use pure
# wml credentials with project and space IDs, but in addition we
# leave a possibility to use a previous WorkSpace implementation, it could be passed as a first argument
super().__init__(deployment_type='batch')
if isinstance(source_wml_credentials, WorkSpace):
self._source_workspace = source_wml_credentials
elif isinstance(source_wml_credentials, dict):
self._source_workspace = WorkSpace(wml_credentials=source_wml_credentials.copy(),
project_id=source_project_id,
space_id=source_space_id)
else:
self._source_workspace = None
if target_wml_credentials is None:
if isinstance(source_wml_credentials, WorkSpace):
self._target_workspace = source_wml_credentials
elif isinstance(source_wml_credentials, dict):
self._target_workspace = WorkSpace(wml_credentials=source_wml_credentials.copy(),
project_id=source_project_id,
space_id=source_space_id)
else:
self._target_workspace = None
else:
if isinstance(target_wml_credentials, WorkSpace):
self._target_workspace = target_wml_credentials
elif isinstance(source_wml_credentials, dict):
self._target_workspace = WorkSpace(wml_credentials=target_wml_credentials.copy(),
project_id=target_project_id,
space_id=target_space_id)
else:
self._target_workspace = None
# --- end note
self.name = None
self.id = None
self.asset_id = None
def __repr__(self):
return f"name: {self.name}, id: {self.id}, asset_id: {self.asset_id}"
def __str__(self):
return f"name: {self.name}, id: {self.id}, asset_id: {self.asset_id}"
[docs] def score(self, **kwargs):
raise NotImplementedError("Batch deployment supports only job runs.")
[docs] def create(self,
model: Any,
deployment_name: str,
metadata: Optional[Dict] = None,
training_data: Optional[Union['DataFrame', 'ndarray']] = None,
training_target: Optional[Union['DataFrame', 'ndarray']] = None,
experiment_run_id: Optional[str] = None) -> None:
"""
Create deployment from a model.
Parameters
----------
model: Union[Any, str], required
Model object to deploy or local path to the model.
deployment_name: str, required
Name of the deployment
training_data: Union['pandas.DataFrame', 'numpy.ndarray'], optional
Training data for the model
training_target: Union['pandas.DataFrame', 'numpy.ndarray'], optional
Target/label data for the model
metadata: dictionary, optional
Model meta properties.
experiment_run_id: str, optional
ID of a training/experiment (only applicable for AutoAI deployments)
Example
-------
>>> from watson_machine_learning_client.deployment import Batch
>>>
>>> deployment = Batch(
>>> wml_credentials={
>>> "apikey": "...",
>>> "iam_apikey_description": "...",
>>> "iam_apikey_name": "...",
>>> "iam_role_crn": "...",
>>> "iam_serviceid_crn": "...",
>>> "instance_id": "...",
>>> "url": "https://us-south.ml.cloud.ibm.com"
>>> },
>>> project_id="...",
>>> space_id="...")
>>>
>>> deployment.create(
>>> experiment_run_id="...",
>>> model=model,
>>> deployment_name='My new deployment'
>>> )
"""
return super().create(model=model,
deployment_name=deployment_name,
metadata=metadata,
training_data=training_data,
training_target=training_target,
experiment_run_id=experiment_run_id,
deployment_type='batch')
[docs] @BaseDeployment._project_to_space_to_project
def get_params(self) -> Dict:
"""Get deployment parameters."""
return super().get_params()
[docs] @BaseDeployment._project_to_space_to_project
def run_job(self,
payload: Union[DataFrame, List[DataConnection]],
output_data_reference: 'DataConnection' = None,
background_mode: 'bool' = True) -> Union[Dict, Dict[str, List], DataConnection]:
"""
Batch scoring job on WML. Payload or Payload data reference is required. It is passed to the WML where model have been deployed.
Parameters
----------
payload: pandas.DataFrame or List[DataConnection], required
DataFrame with data to test the model or data storage connection details to inform where payload data is stored.
output_data_reference: DataConnection, optional
DataConnection to the output COS for storing predictions.
Required nly when DataConnections are as a payload.
background_mode: bool, optional
Indicator if score() method will run in background (async) or (sync).
Returns
-------
Dictionary with scoring job details.
Example
-------
>>> score_details = deployment.score(payload=test_data)
>>> print(score_details['entity']['scoring'])
{'input_data': [{'fields': ['sepal_length',
'sepal_width',
'petal_length',
'petal_width'],
'values': [[4.9, 3.0, 1.4, 0.2]]}],
'predictions': [{'fields': ['prediction', 'probability'],
'values': [['setosa',
[0.9999320742502246,
5.1519823540224506e-05,
1.6405926235405522e-05]]]}]
>>>
>>> payload_reference = DataConnection(location=DSLocation(asset_id=asset_id))
>>> score_details = deployment.score(payload=payload_reference, output_data_filename = "scoring_output.csv")
"""
# note: support for DataFrame payload
if isinstance(payload, DataFrame):
scoring_payload = {
self._target_workspace.wml_client.deployments.ScoringMetaNames.INPUT_DATA: [{'values': payload}]
}
# note: support for DataConnections and dictionaries payload
elif isinstance(payload, list):
if isinstance(payload[0], DataConnection):
payload = validate_source_data_connections(source_data_connections=payload,
workspace=self._source_workspace,
deployment=True)
payload = [data_connection._to_dict() for data_connection in payload]
elif isinstance(payload[0], dict):
pass
else:
raise ValueError(f"Current payload type: list of {type(payload[0])} is not supported.")
if output_data_reference is None:
raise ValueError("\"output_data_reference\" should be provided.")
if isinstance(output_data_reference, DataConnection):
output_data_reference = validate_deployment_output_connection(results_data_connection=output_data_reference,
workspace=self._target_workspace,
source_data_connections = payload)
output_data_reference = output_data_reference._to_dict()
scoring_payload = {
self._target_workspace.wml_client.deployments.ScoringMetaNames.INPUT_DATA_REFERENCES: payload,
self._target_workspace.wml_client.deployments.ScoringMetaNames.OUTPUT_DATA_REFERENCE:
output_data_reference}
else:
raise ValueError(
f"Incorrect payload type. Required: DataFrame or List[DataConnection], Passed: {type(payload)}")
scoring_payload['hybrid_pipeline_hardware_specs'] = [
{
'node_runtime_id': 'auto_ai.kb',
'hardware_spec': {
'name': 'M'
}
}
]
if self._obm:
scoring_payload['hybrid_pipeline_hardware_specs'].append(
{
"node_runtime_id": "auto_ai.obm",
"hardware_spec": {
"name": "M",
"num_nodes": 2
}
}
)
# note: support for spark hummingbird
if self._obm:
change_client_get_headers(client=self._target_workspace.wml_client.training._client,
_type='new',
workspace=self._target_workspace)
job_details = self._target_workspace.wml_client.deployments.create_job(self.id, scoring_payload)
# note: support for spark hummingbird
if self._obm:
change_client_get_headers(client=self._target_workspace.wml_client.training._client,
_type='old',
workspace=self._target_workspace)
if background_mode:
return job_details
else:
# note: monitor scoring job
job_id = self._target_workspace.wml_client.deployments.get_job_uid(job_details)
print_text_header_h1(u'Synchronous scoring for id: \'{}\' started'.format(job_id))
status = self.get_job_status(job_id)['state']
with StatusLogger(status) as status_logger:
while status not in ['failed', 'error', 'completed', 'canceled']:
time.sleep(10)
status = self.get_job_status(job_id)['state']
status_logger.log_state(status)
# --- end note
if u'completed' in status:
print(u'\nScoring job \'{}\' finished successfully.'.format(job_id))
else:
raise BatchJobFailed(job_id, f"Scoring job failed with status: {self.get_job_status(job_id)}")
return self.get_job_params(job_id)
[docs] @BaseDeployment._project_to_space_to_project
def rerun_job(self,
scoring_job_id: str,
background_mode: bool = True) -> Union[dict, 'DataFrame', 'DataConnection']:
"""
Rerun scoring job with the same parameters as job described by 'scoring_job_id'.
Parameters
----------
scoring_job_id: str
Id described scoring job.
background_mode: bool, optional
Indicator if score_rerun() method will run in background (async) or (sync).
Returns
-------
Dictionary with scoring job details.
Example
-------
>>> scoring_details = deployment.score_rerun(scoring_job_id)
"""
scoring_params = self.get_job_params(scoring_job_id)['entity']['scoring']
input_data_references = self._target_workspace.wml_client.deployments.ScoringMetaNames.INPUT_DATA_REFERENCES
if input_data_references in scoring_params:
payload_ref = [input_ref for input_ref in scoring_params[input_data_references]]
return self.run_job(payload=payload_ref, output_data_reference=scoring_params['output_data_reference'],
background_mode=background_mode)
else:
payload_df = DataFrame.from_dict(scoring_params['input_data'])
return self.score(payload=payload_df, background_mode=background_mode)
[docs] @BaseDeployment._project_to_space_to_project
def delete(self, deployment_id: str = None) -> None:
"""
Delete deployment on WML.
Parameters
----------
deployment_id: str, optional
ID of the deployment to delete. If empty, current deployment will be deleted.
Example
-------
>>> deployment = Batch(workspace=...)
>>> # Delete current deployment
>>> deployment.delete()
>>> # Or delete a specific deployment
>>> deployment.delete(deployment_id='...')
"""
super().delete(deployment_id=deployment_id, deployment_type='batch')
[docs] @BaseDeployment._project_to_space_to_project
def list(self, limit=None) -> 'DataFrame':
"""
List WML deployments.
Parameters
----------
limit: int, optional
Set the limit of how many deployments to list. Default is None (all deployments should be fetched)
Returns
-------
Pandas DataFrame with information about deployments.
Example
-------
>>> deployment = Batch(workspace=...)
>>> deployments_list = deployment.list()
>>> print(deployments_list)
created_at ... status
0 2020-03-06T10:50:49.401Z ... ready
1 2020-03-06T13:16:09.789Z ... ready
4 2020-03-11T14:46:36.035Z ... failed
3 2020-03-11T14:49:55.052Z ... failed
2 2020-03-11T15:13:53.708Z ... ready
"""
return super().list(limit=limit, deployment_type='batch')
[docs] @BaseDeployment._project_to_space_to_project
def get(self, deployment_id: str) -> None:
"""
Get WML deployment.
Parameters
----------
deployment_id: str, required
ID of the deployment to work with.
Returns
-------
WebService deployment object
Example
-------
>>> deployment = Batch(workspace=...)
>>> deployment.get(deployment_id="...")
"""
super().get(deployment_id=deployment_id, deployment_type='batch')
[docs] @BaseDeployment._project_to_space_to_project
def get_job_params(self, scoring_job_id: str = None) -> Dict:
"""Get batch deployment job parameters.
Parameters
----------
scoring_job_id: str
Id of scoring job.
Returns
-------
Dictionary with parameters of the scoring job.
"""
return self._target_workspace.wml_client.deployments.get_job_details(scoring_job_id)
[docs] @BaseDeployment._project_to_space_to_project
def get_job_status(self, scoring_job_id: str) -> Dict:
"""Get status of scoring job.
Parameters
----------
scoring_job_id: str
Id of scoring job.
Returns
-------
Dictionary with state of scoring job (one of: [completed, failed, starting, queued])
and additional details if they exist.
"""
return self._target_workspace.wml_client.deployments.get_job_status(scoring_job_id)
[docs] @BaseDeployment._project_to_space_to_project
def get_job_result(self, scoring_job_id: str) -> Union[Dict, 'DataFrame']:
"""Get batch deployment results of job with id `scoring_job_id`.
Parameters
----------
scoring_job_id: str
Id of scoring job which results will be returned.
Returns
-------
Dictionary with predictions for scoring job with inline input data or
dictionary with data reference to output data if the scoring job has reference to input data.
In case of incompleted or failed scoring None is returned.
"""
scoring_params = self.get_job_params(scoring_job_id)['entity']['scoring']
if scoring_params['status']['state'] == 'completed':
if 'predictions' in scoring_params:
return scoring_params['predictions']
else:
return self._download_results(scoring_params['output_data_reference'], scoring_job_id)
else:
raise MissingScoringResults(scoring_job_id, reason="Scoring is not completed.")
@staticmethod
def _download_results(data_connection: dict, scoring_job_id: str) -> Union[Dict, 'DataFrame']:
"""Download batch results."""
if 's3' in str(data_connection):
cos_client = init_cos_client(data_connection['connection'])
# note: fetch all batch results file names per scoring_job_id
cos_summary = cos_client.Bucket(data_connection['location']['bucket']).objects.filter(
Prefix=data_connection['location']['path'])
file_names = [file_name.key for file_name in cos_summary if scoring_job_id in file_name.key]
# --- end note
# TODO: this can be done simultaneously (multithreading / multiprocessing)
# note: download all data parts and concatenate them into one output
parts = []
for file_name in file_names:
file = cos_client.Object(data_connection['location']['bucket'], file_name).get()
buffer = io.BytesIO(file['Body'].read())
parts.append(try_load_dataset(buffer=buffer))
data = concat(parts)
# --- end note
return data
else:
return data_connection
[docs] def get_job_id(self, batch_scoring_details):
"""Get id from batch scoring details."""
return self._target_workspace.wml_client.deployments.get_job_uid(batch_scoring_details)
[docs] def list_jobs(self):
"""Returns pandas DataFrame with list of deployment jobs"""
resources = self._target_workspace.wml_client.deployments.get_job_details()['resources']
columns = [u'job id', u'state', u'creted', u'deployment id']
values = []
for scoring_details in resources:
if 'scoring' in scoring_details['entity']:
state = scoring_details['entity']['scoring']['status']['state']
score_values = (scoring_details[u'metadata'][u'guid'], state,
scoring_details[u'metadata'][u'created_at'],
scoring_details['entity']['deployment']['id'])
if self.id:
if self.id == scoring_details['entity']['deployment']['id']:
values.append(score_values)
else:
values.append(score_values)
return DataFrame(values, columns=columns)
@BaseDeployment._project_to_space_to_project
def _deploy(self,
pipeline_model: 'Pipeline',
deployment_name: str,
meta_props: Dict,
training_data: Union['DataFrame', 'ndarray'],
training_target: Union['DataFrame', 'ndarray'],
result_client=None) -> Dict:
"""
Deploy model into WML.
Parameters
----------
pipeline_model: Union['Pipeline', str], required
Model of the pipeline to deploy
deployment_name: str, required
Name of the deployment
training_data: Union['pandas.DataFrame', 'numpy.ndarray'], required
Training data for the model
training_target: Union['pandas.DataFrame', 'numpy.ndarray'], required
Target/label data for the model
meta_props: dictionary, required
Model meta properties.
result_client: Tuple['DataConnection', 'resource'] required
Tuple with Result DataConnection object and initialized COS client.
"""
deployment_details = {}
if result_client is not None:
asset_uid = self._publish_model_from_notebook(pipeline_model=pipeline_model,
meta_props=meta_props,
result_client=result_client)
else:
asset_uid = self._publish_model(pipeline_model=pipeline_model,
meta_props=meta_props,
training_data=training_data,
training_target=training_target)
self.asset_id = asset_uid
deployment_props = {
self._target_workspace.wml_client.deployments.ConfigurationMetaNames.NAME: deployment_name,
self._target_workspace.wml_client.deployments.ConfigurationMetaNames.BATCH: {}
}
if self._target_workspace.wml_client.ICP_30:
asset_href = self._target_workspace.wml_client.repository.get_model_href(
self._target_workspace.wml_client.repository.get_model_details(asset_uid))
deployment_props[self._target_workspace.wml_client.deployments.ConfigurationMetaNames.ASSET] = {
"href": asset_href
}
deployment_props[self._target_workspace.wml_client.deployments.ConfigurationMetaNames.SPACE_UID] = (
self._target_workspace.wml_client.default_space_id)
deployment_props[
self._target_workspace.wml_client.deployments.ConfigurationMetaNames.HYBRID_PIPELINE_HARDWARE_SPECS] = [
{
'node_runtime_id': 'autoai.kb',
'hardware_spec': {
'name': 'M'
}
}
]
else:
deployment_props[self._target_workspace.wml_client.deployments.ConfigurationMetaNames.COMPUTE] = {
"name": "M",
"nodes": 1
}
print("Deploying model {} using V4 client.".format(asset_uid))
try:
deployment_details = self._target_workspace.wml_client.deployments.create(
artifact_uid=asset_uid,
meta_props=deployment_props)
self.deployment_id = self._target_workspace.wml_client.deployments.get_uid(deployment_details)
except WMLClientError as e:
raise e
return deployment_details