Source code for ads.feature_store.feature_group_job

#!/usr/bin/env python
# -*- coding: utf-8; -*-

# Copyright (c) 2023 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

import json
import logging
from copy import deepcopy
from enum import Enum
from typing import Dict, List, Any, Union

import pandas

from ads.common import utils
from ads.feature_store.common.enums import BatchIngestionMode, StreamingIngestionMode
from ads.feature_store.feature_option_details import FeatureOptionDetails
from ads.feature_store.service.oci_feature_group_job import OCIFeatureGroupJob
from ads.jobs.builders.base import Builder

logger = logging.getLogger(__name__)


[docs] class JobConfigurationType(Enum): SPARK_BATCH_AUTOMATIC = "SPARK_BATCH_AUTOMATIC" SPARK_BATCH_MANUAL = "SPARK_BATCH_MANUAL"
[docs] class FeatureGroupJob(Builder): """Represents an FeatureGroupJob Resource. Methods -------- create(self, **kwargs) -> "FeatureGroupJob" Creates feature_group_run resource. from_id(cls, id: str) -> "FeatureGroupJob" Gets an existing feature_group_run resource by id. list(cls, compartment_id: str = None, **kwargs) -> List["FeatureGroupJob"] Lists feature_group_run resources in a given compartment. list_df(cls, compartment_id: str = None, **kwargs) -> "pandas.DataFrame" Lists feature_group_run resources as a pandas dataframe. with_compartment_id(self, compartment_id: str) -> "FeatureGroupJob" Sets the compartment ID. with_feature_group_id(self, feature_group_id: str) -> "FeatureGroupJob" Sets the feature group ID. with_ingestion_mode(self, ingestion_mode: IngestionMode) -> "FeatureGroupJob" Sets the ingestion mode. Examples -------- >>> from ads.feature_store import feature_group_job >>> import oci >>> import os >>> feature_group_run = feature_group_run.FeatureGroupJob() >>> .with_compartment_id(os.environ["PROJECT_COMPARTMENT_OCID"]) >>> .with_feature_group_id("<feature_group_id>") >>> .with_ingestion_mode(IngestionMode.OVERWRITE) >>> feature_group_run.create() """ _PREFIX = "feature_group_run_resource" CONST_ID = "id" CONST_COMPARTMENT_ID = "compartmentId" CONST_FEATURE_GROUP_ID = "featureGroupId" CONST_INGESTION_MODE = "ingestionMode" CONST_JOB_CONFIGURATION_DETAILS = "jobConfigurationDetails" CONST_JOB_CONFIGURATION_TYPE = "jobConfigurationType" CONST_FREEFORM_TAG = "freeformTags" CONST_FEATURE_OPTION_DETAILS = "featureOptionsDetails" CONST_DEFINED_TAG = "definedTags" CONST_TIME_FROM = "timeFrom" CONST_TIME_TO = "timeTo" CONST_LIFECYCLE_STATE = "lifecycleState" CONST_JOB_OUTPUT_DETAILS = "jobOutputDetails" CONST_DATA_FLOW_EXECUTION_OUTPUT = "dataFlowBatchExecutionOutput" CONST_VALIDATION_OUTPUT = "validation_output" CONST_DATA_FLOW_READ_WRITE_DETAIL = "data_flow_read_write_detail" CONST_DATA_READ_IN_BYTES = "data_read_in_bytes" CONST_DATA_WRITTEN_BYTES = "data_written_in_bytes" CONST_FEATURE_STATISTICS = "featureStatistics" attribute_map = { CONST_ID: "id", CONST_COMPARTMENT_ID: "compartment_id", CONST_FEATURE_GROUP_ID: "feature_group_id", CONST_INGESTION_MODE: "ingestion_mode", CONST_JOB_CONFIGURATION_DETAILS: "job_configuration_details", CONST_JOB_CONFIGURATION_TYPE: "job_configuration_type", CONST_FEATURE_OPTION_DETAILS: "feature_option_details", CONST_TIME_FROM: "time_from", CONST_TIME_TO: "time_to", CONST_FREEFORM_TAG: "freeform_tags", CONST_DEFINED_TAG: "defined_tags", CONST_LIFECYCLE_STATE: "lifecycle_state", CONST_JOB_OUTPUT_DETAILS: "job_output_details", CONST_DATA_FLOW_EXECUTION_OUTPUT: "data_flow_batch_execution_output", CONST_FEATURE_STATISTICS: "feature_statistics", } def __init__(self, spec: Dict = None, **kwargs) -> None: """Initializes FeatureGroupJob Resource. Parameters ---------- spec: (Dict, optional). Defaults to None. Object specification. kwargs: Dict Specification as keyword arguments. If 'spec' contains the same key as the one in kwargs, the value from kwargs will be used. """ super().__init__(spec=spec, **deepcopy(kwargs)) # Specify oci FeatureGroupJob instance self.oci_fs_feature_group_run = self._to_oci_fs_feature_group_run(**kwargs) def _to_oci_fs_feature_group_run(self, **kwargs): """Creates an `OCIFeatureGroupJob` instance from the `FeatureGroupJob`. kwargs Additional kwargs arguments. Can be any attribute that `feature_store.models.FeatureGroupJob` accepts. Returns ------- OCIFeatureGroupJob The instance of the OCIFeatureGroupJob. """ fs_spec = {} for infra_attr, dsc_attr in self.attribute_map.items(): value = self.get_spec(infra_attr) fs_spec[dsc_attr] = value fs_spec.update(**kwargs) return OCIFeatureGroupJob(**fs_spec) @property def kind(self) -> str: """The kind of the object as showing in a YAML.""" return "feature_group_job" @property def compartment_id(self) -> str: return self.get_spec(self.CONST_COMPARTMENT_ID) @compartment_id.setter def compartment_id(self, value: str): self.with_compartment_id(value)
[docs] def with_compartment_id(self, compartment_id: str) -> "FeatureGroupJob": """Sets the compartment_id. Parameters ---------- compartment_id: str The compartment_id. Returns ------- FeatureGroupJob The FeatureGroupJob instance (self) """ return self.set_spec(self.CONST_COMPARTMENT_ID, compartment_id)
[docs] def get_validation_output_df(self) -> "pandas.DataFrame": """ This method retrieves the validation output as a Pandas DataFrame. Returns: pandas.DataFrame -- The validation output data in DataFrame format. """ # retrieve the validation output JSON from data_flow_batch_execution_output validation_output_json = json.loads( self.job_output_details.get("validationOutput") ) # Convert Python object to Pandas DataFrame validation_output_df = pandas.json_normalize(validation_output_json).transpose() # return the validation output DataFrame return validation_output_df
@property def time_from(self) -> str: return self.get_spec(self.CONST_TIME_FROM)
[docs] def with_time_from(self, time_from: str) -> "FeatureGroupJob": """Sets the time_from. Parameters ---------- time_from: str The time_from. Returns ------- FeatureGroupJob The FeatureGroupJob instance (self) """ return self.set_spec(self.CONST_TIME_FROM, time_from)
@property def time_to(self) -> str: return self.get_spec(self.CONST_TIME_TO)
[docs] def with_time_to(self, time_to: str) -> "FeatureGroupJob": """Sets the time_to. Parameters ---------- time_to: str The time_to. Returns ------- FeatureGroupJob The FeatureGroupJob instance (self) """ return self.set_spec(self.CONST_TIME_TO, time_to)
@property def feature_group_id(self) -> str: return self.get_spec(self.CONST_FEATURE_GROUP_ID) @feature_group_id.setter def feature_group_id(self, value: str): self.with_feature_group_id(value)
[docs] def with_feature_group_id(self, feature_group_id: str) -> "FeatureGroupJob": """Sets the feature_group_id. Parameters ---------- feature_group_id: str The feature group id. Returns ------- FeatureGroupJob The FeatureGroupJob instance (self) """ return self.set_spec(self.CONST_FEATURE_GROUP_ID, feature_group_id)
[docs] def with_id(self, id: str) -> "FeatureGroupJob": return self.set_spec(self.CONST_ID, id)
@property def id(self) -> str: return self.get_spec(self.CONST_ID) @property def job_configuration_details(self) -> str: return self.get_spec(self.CONST_JOB_CONFIGURATION_DETAILS) @job_configuration_details.setter def job_configuration_details( self, job_configuration_type: JobConfigurationType, **kwargs: Dict[str, Any] ): self.with_job_configuration_details(job_configuration_type, **kwargs)
[docs] def with_job_configuration_details( self, job_configuration_type: JobConfigurationType, **kwargs: Dict[str, Any] ) -> "FeatureGroupJob": """Sets the job configuration details. Parameters ---------- job_configuration_type: JobConfigurationType The job_configuration_type of job kwargs: Dict[str, Any] Additional key value arguments Returns ------- FeatureGroupJob The FeatureGroupJob instance (self) """ return self.set_spec( self.CONST_JOB_CONFIGURATION_DETAILS, { self.CONST_JOB_CONFIGURATION_TYPE: job_configuration_type.value, **kwargs, }, )
@property def job_output_details(self) -> Dict: return self.get_spec(self.CONST_JOB_OUTPUT_DETAILS)
[docs] def with_job_output_details(self, job_output_details: Dict) -> "FeatureGroupJob": """Sets the job output details. Parameters ---------- job_output_details: Dict The job output details which contains error_details, validation_output and commit id. Returns ------- FeatureGroupJob The FeatureGroupJob instance (self) """ return self.set_spec(self.CONST_JOB_OUTPUT_DETAILS, job_output_details)
@property def feature_option_details(self) -> Dict: return self.get_spec(self.CONST_FEATURE_OPTION_DETAILS)
[docs] def with_feature_option_details( self, feature_option_details: FeatureOptionDetails ) -> "FeatureGroupJob": """Sets the feature_option_details. Parameters ---------- feature_option_details: FeatureOptionDetails Returns ------- FeatureGroupJob The FeatureGroupJob instance (self) """ return self.set_spec( self.CONST_FEATURE_OPTION_DETAILS, feature_option_details.to_dict() )
@property def ingestion_mode(self) -> str: return self.get_spec(self.CONST_INGESTION_MODE) @ingestion_mode.setter def ingestion_mode( self, ingestion_mode: Union[BatchIngestionMode, StreamingIngestionMode] ) -> "FeatureGroupJob": return self.with_ingestion_mode(ingestion_mode)
[docs] def with_ingestion_mode( self, ingestion_mode: Union[BatchIngestionMode, StreamingIngestionMode] ) -> "FeatureGroupJob": """Sets the mode of the dataset ingestion mode. Parameters ---------- ingestion_mode Returns ------- FeatureGroupJob The FeatureGroupJob instance (self) """ return self.set_spec(self.CONST_INGESTION_MODE, ingestion_mode.value)
[docs] def with_lifecycle_state(self, lifecycle_state: str) -> "FeatureGroupJob": """Sets the lifecycle_state. Parameters ---------- lifecycle_state: str The lifecycle_state. Returns ------- FeatureGroupJob The FeatureGroupJob instance (self) """ return self.set_spec(self.CONST_LIFECYCLE_STATE, lifecycle_state)
@property def feature_statistics(self) -> str: return self.get_spec(self.CONST_FEATURE_STATISTICS)
[docs] def with_feature_statistics(self, feature_statistics: str) -> "FeatureGroupJob": """Sets the computed statistics. Parameters ---------- feature_statistics: str Computed Feature Statistics Returns ------- FeatureGroupJob The FeatureGroupJob instance (self) """ return self.set_spec(self.CONST_FEATURE_STATISTICS, feature_statistics)
[docs] @classmethod def from_id(cls, id: str) -> "FeatureGroupJob": """Gets an existing feature_group_run resource by Id. Parameters ---------- id: str The feature_group_run id. Returns ------- FeatureStore An instance of FeatureGroupJob resource. """ return cls()._update_from_oci_fs_model(OCIFeatureGroupJob.from_id(id))
[docs] def create(self, **kwargs) -> "FeatureGroupJob": """Creates feature_group_run resource. Parameters ---------- kwargs Additional kwargs arguments. Can be any attribute that `feature_store.models.FeatureGroupJob` accepts. Returns ------- FeatureStore The FeatureGroupJob instance (self) Raises ------ ValueError If compartment id not provided. """ if not self.compartment_id: raise ValueError("Compartment id must be provided.") payload = deepcopy(self._spec) payload.pop("id", None) logger.debug(f"Creating a feature_group_run resource with payload {payload}") # Create entity logger.info("Saving entity.") self.oci_fs_feature_group_run = self._to_oci_fs_feature_group_run( **kwargs ).create() self.with_id(self.oci_fs_feature_group_run.id) return self
[docs] def update(self, **kwargs) -> "FeatureGroupJob": """Updates FeatureGroupJob in the feature store. Parameters ---------- kwargs Additional kwargs arguments. Can be any attribute that `feature_store.models.FeatureGroupJob` accepts. Returns ------- FeatureGroupJob The FeatureGroupJob instance (self). """ if not self.id: raise ValueError( "FeatureGroupJob needs to be saved to the feature store before it can be updated." ) self.oci_fs_feature_group_run = self._to_oci_fs_feature_group_run( **kwargs ).update() return self
def _mark_job_complete(self, job_output_details) -> "FeatureGroupJob": """ Completes the current feature group job and returns a new `FeatureGroupJob` object that represents it. :param job_output_details: The output details of the completed job. :return: A new `FeatureGroupJob` object representing the completed job. :raises ValueError: If the current `FeatureGroupJob` object has not been saved to the feature store yet. """ # Check if the current FeatureGroupJob has been saved to the feature store if not self.id: raise ValueError( "FeatureGroupJob needs to be saved to the feature store before it can be marked as completed." ) # Update the job's output details with the given ones self.with_job_output_details(job_output_details) # Complete the job in the OCI feature store and return a new FeatureGroupJob object self.oci_fs_feature_group_run = ( self._to_oci_fs_feature_group_run().complete_feature_group_job() ) return self def _update_from_oci_fs_model( self, oci_fs_feature_group_run: OCIFeatureGroupJob ) -> "FeatureGroupJob": """Update the properties from an OCIFeatureGroupJob object. Parameters ---------- oci_fs_feature_group_run: OCIFeatureGroupJob An instance of OCIFeatureGroupJob. Returns ------- FeatureGroupJob The FeatureGroupJob instance (self). """ # Update the main properties self.oci_fs_feature_group_run = oci_fs_feature_group_run feature_group_run_details = oci_fs_feature_group_run.to_dict() for infra_attr, dsc_attr in self.attribute_map.items(): if infra_attr in feature_group_run_details: self.set_spec(infra_attr, feature_group_run_details[infra_attr]) return self
[docs] @classmethod def list_df(cls, compartment_id: str = None, **kwargs) -> "pandas.DataFrame": """Lists feature_group_run resources in a given compartment. Parameters ---------- compartment_id: (str, optional). Defaults to `None`. The compartment OCID. kwargs Additional keyword arguments for filtering models. Returns ------- pandas.DataFrame The list of the feature_group_run resources in a pandas dataframe format. """ records = [] for oci_fs_feature_group_run in OCIFeatureGroupJob.list_resource( compartment_id, **kwargs ): records.append( { "id": oci_fs_feature_group_run.id, "display_name": oci_fs_feature_group_run.display_name, "time_created": oci_fs_feature_group_run.time_created.strftime( utils.date_format ), "time_updated": oci_fs_feature_group_run.time_updated.strftime( utils.date_format ), "lifecycle_state": oci_fs_feature_group_run.lifecycle_state, "created_by": f"...{oci_fs_feature_group_run.created_by[-6:]}", "compartment_id": f"...{oci_fs_feature_group_run.compartment_id[-6:]}", "feature_group_id": oci_fs_feature_group_run.feature_group_id, } ) return pandas.DataFrame.from_records(records)
[docs] @classmethod def list(cls, compartment_id: str = None, **kwargs) -> List["FeatureGroupJob"]: """Lists FeatureGroupJob Resources in a given compartment. Parameters ---------- compartment_id: (str, optional). Defaults to `None`. The compartment OCID. kwargs Additional keyword arguments for filtering FeatureGroupJob. Returns ------- List[FeatureGroupJob] The list of the FeatureGroupJob Resources. """ return [ cls()._update_from_oci_fs_model(oci_fs_feature_group_run) for oci_fs_feature_group_run in OCIFeatureGroupJob.list_resource( compartment_id, **kwargs ) ]
def _random_display_name(self): """Generates a random display name.""" return f"{self._PREFIX}-{utils.get_random_name_for_resource()}"
[docs] def to_dict(self) -> Dict: """Serializes feature_group_run to a dictionary. Returns ------- dict The feature_group_run resource serialized as a dictionary. """ spec = deepcopy(self._spec) for key, value in spec.items(): if hasattr(value, "to_dict"): value = value.to_dict() spec[key] = value return { "kind": self.kind, "type": self.type, "spec": utils.batch_convert_case(spec, "camel"), }
def __repr__(self) -> str: """Displays the object as YAML.""" return self.to_yaml()