Package archimedes
Functions
def compact_print(df: pandas.core.frame.DataFrame, show_mapping: bool = False, all_rows: bool = False) ‑> NoneType
-
Prints a compact version of the DataFrame
Example
>>> df = archimedes.load_data("fmri") >>> arhcimedes.compact_print(df, True, False) a b c d e 0 s13 18 stim parietal -0.017552 1 s5 14 stim parietal -0.080883 ... ... .. ... ... ... 1062 s11 7 cue frontal -0.025367 1063 s0 0 cue parietal -0.006899 ... [1064 rows x 5 columns] This is a compact version of the dataframe, with columns: {'a': 'subject', 'b': 'timepoint', 'c': 'event', 'd': 'region', 'e': 'signal'}
Args
df
:pd.DataFrame
- The dataframe that you want to print
show_mapping
:bool
, optional- Set to True to print the column name mapping. Defaults to False.
all_rows
:bool
, optional- Set to True if you want to print all rows. Defaults to False.
Expand source code
def compact_print( df: pd.DataFrame, show_mapping: bool = False, all_rows: bool = False ) -> None: """Prints a compact version of the DataFrame Example: >>> df = archimedes.load_data("fmri") >>> arhcimedes.compact_print(df, True, False) a b c d e 0 s13 18 stim parietal -0.017552 1 s5 14 stim parietal -0.080883 ... ... .. ... ... ... 1062 s11 7 cue frontal -0.025367 1063 s0 0 cue parietal -0.006899 ... [1064 rows x 5 columns] This is a compact version of the dataframe, with columns: {'a': 'subject', 'b': 'timepoint', 'c': 'event', 'd': 'region', 'e': 'signal'} Args: df (pd.DataFrame): The dataframe that you want to print show_mapping (bool, optional): Set to True to print the column name mapping. Defaults to False. all_rows (bool, optional): Set to True if you want to print all rows. Defaults to False. """ df_ = df.copy() num_columns = len(df_.columns) actual_columns = df_.columns compact_columns = REPLACEMENT_NAMES[0:num_columns] mapping = dict(zip(compact_columns, actual_columns)) df_.columns = compact_columns if all_rows: pd.set_option("display.max_rows", None) print(df_) pd.set_option("display.max_rows", 10) else: print(df_) if show_mapping: print("This is a compact version of the dataframe, with columns:") pprint(mapping)
def deploy(model, model_name, cron=None)
-
Deploy a model
As for run model, 'model' can here be one of: - "app:main" - "../app.py" - "
" # we implement this one first Expand source code
def deploy(model, model_name, cron=None): """Deploy a model As for run model, 'model' can here be one of: - "app:main" - "../app.py" - "<function>" # we implement this one first """ _configure_prefect_server_endpoint() from prefect import task, Flow, client from prefect.environments.storage import Docker from prefect.schedules import Schedule from prefect.schedules.clocks import CronClock if cron: schedule = Schedule(clocks=[CronClock(cron)]) else: schedule = None context = _setup() project_name = context["project_name"] prefect_client = client.Client(api_server=config.prefect.api_server) _create_prefect_project_if_not_exist(prefect_client, project_name) model_type, model_to_run = _typecheck_model(model) def wrapper_func(): return run(model_to_run, model_name, local_mlflow=False) only_task = task(wrapper_func, name=model_to_run.__name__) flow = Flow( name=model_name, tasks=[only_task], schedule=schedule, ) flow.storage = Docker( registry_url=config.prefect.docker_registry_url, dockerfile='Dockerfile', ) flow.register(project_name=project_name)
def full_print(df: pandas.core.frame.DataFrame) ‑> NoneType
-
Prints the full DataFrame
Example
>>> df = archimedes.load_data("fmri") >>> arhcimedes.full_print(df) subject timepoint event region signal 0 s13 18 stim parietal -0.017552 1 s5 14 stim parietal -0.080883 2 s12 18 stim parietal -0.081033 3 s11 18 stim parietal -0.046134 4 s10 18 stim parietal -0.037970 5 s9 18 stim parietal -0.103513 6 s8 18 stim parietal -0.064408 7 s7 18 stim parietal -0.060526 ...
Args
df
:pd.DataFrame
- The dataframe that you want to print
Expand source code
def full_print(df: pd.DataFrame) -> None: """Prints the full DataFrame Example: >>> df = archimedes.load_data("fmri") >>> arhcimedes.full_print(df) subject timepoint event region signal 0 s13 18 stim parietal -0.017552 1 s5 14 stim parietal -0.080883 2 s12 18 stim parietal -0.081033 3 s11 18 stim parietal -0.046134 4 s10 18 stim parietal -0.037970 5 s9 18 stim parietal -0.103513 6 s8 18 stim parietal -0.064408 7 s7 18 stim parietal -0.060526 ... Args: df (pd.DataFrame): The dataframe that you want to print """ pd.set_option("display.max_rows", None) print(df) # print(df.tail(1)) pd.set_option("display.max_rows", 10)
def get(series_ids: List[str], price_areas: List[str] = None, start: str = None, end: str = None, flatten_columns: bool = False)
-
Get any number of time series.
This function can be used to fetch time series from the Archimedes Database. To see which series are available, use
list_ids()
.Example
>>> archimedes.get( >>> series_ids=["NP/AreaPrices"], >>> price_areas=["NO1", "NO2"], >>> start="2020-06-20T04:00:00+00:00", >>> end="2020-06-28T04:00:00+00:00", >>> ) series_id NP/AreaPrices price_area NO1 NO2 from_dt 2020-06-20T04:00:00+00:00 1.30 1.30 2020-06-20T05:00:00+00:00 1.35 1.35 ... ... ... 2020-06-28T03:00:00+00:00 0.53 0.53 2020-06-28T04:00:00+00:00 0.55 0.55
Args
series_ids
:List[str]
- The series ids to get.
price_areas
:List[str]
, optional- The price areas to pick, all price areas if None. Defaults to None.
start
:str
, optional- The first datetime to fetch (inclusive). Returns all if None. Defaults to None.
end
:str
, optional- The last datetime to fetch (exclusive). Returns all if None. Defaults to None.
flatten_columns
:bool
, optional- The column names are flattened if True. Defaults to False.
Returns
DataFrame with all the time series data
Expand source code
def get( series_ids: List[str], price_areas: List[str] = None, start: str = None, end: str = None, flatten_columns: bool = False, ): """Get any number of time series. This function can be used to fetch time series from the Archimedes Database. To see which series are available, use `list_ids()`. Example: >>> archimedes.get( >>> series_ids=["NP/AreaPrices"], >>> price_areas=["NO1", "NO2"], >>> start="2020-06-20T04:00:00+00:00", >>> end="2020-06-28T04:00:00+00:00", >>> ) series_id NP/AreaPrices price_area NO1 NO2 from_dt 2020-06-20T04:00:00+00:00 1.30 1.30 2020-06-20T05:00:00+00:00 1.35 1.35 ... ... ... 2020-06-28T03:00:00+00:00 0.53 0.53 2020-06-28T04:00:00+00:00 0.55 0.55 Args: series_ids (List[str]): The series ids to get. price_areas (List[str], optional): The price areas to pick, all price areas if None. Defaults to None. start (str, optional): The first datetime to fetch (inclusive). Returns all if None. Defaults to None. end (str, optional): The last datetime to fetch (exclusive). Returns all if None. Defaults to None. flatten_columns (bool, optional): The column names are flattened if True. Defaults to False. Returns: DataFrame with all the time series data """ if db == None: raise ValueError(db_error_msg) if isinstance(series_ids, str): series_ids = [series_ids] if isinstance(price_areas, str): price_areas = [price_areas] if price_areas == None: price_areas = archimedes.constants.ALL_PRICE_AREAS if start == None: start = archimedes.constants.DATE_LOW else: start = pd.to_datetime(start) if end == None: end = archimedes.constants.DATE_HIGH else: end = pd.to_datetime(end) # begin test tomorrow query = """ SELECT c.series_id, c.from_dt, c.price_area, c.value, c.version FROM ( SELECT * FROM nordpool UNION SELECT * FROM statnett ) as c WHERE c.series_id IN :series_ids AND c.price_area IN :price_areas AND c.from_dt >= :start AND c.from_dt < :end """ rows = db.query( query, series_ids=tuple(series_ids), price_areas=tuple(price_areas), start=start, end=end, ) df = rows.export("df") # df = df.set_index(["from_dt", "series_id", "price_area", "version"]).unstack("series_id").unstack("price_area") df = df.sort_values(by=["from_dt", "version"]) df = df.pivot_table( values="value", columns=["series_id", "price_area"], index="from_dt", aggfunc="last", ) if flatten_columns: new_columns = ["/".join(list(column)) for column in df.columns] df.columns = new_columns df = df.astype(float) return df
def get_latest(series_ids: List[str], price_areas: List[str] = None, flatten_columns: bool = False)
-
Get the most recent data for any number of time series.
This function is similar to
get()
, but only fetches data from the past 48 hours, potentially including future hours as well (as in the case of Spot price data).@TODO: Add an argument
hours
that allows the 'lookback' period to be extended to an arbitrary number of hours.Example
>>> # Calling this function at 2020-03-15T10:15:00 >>> archimedes.get_latest( >>> series_ids=["NP/AreaPrices", "NP/ConsumptionImbalancePrices"], >>> price_areas=["NO1"], >>> ) series_id NP/AreaPrices NP/ConsumptionImbalancePrices price_area NO1 NO1 from_dt 2020-03-14T04:11:00+00:00 1.30 1.30 2020-03-14T05:12:00+00:00 1.35 1.35 ... ... ... 2020-03-15T22:00:00+00:00 0.53 NaN 2020-03-15T23:00:00+00:00 0.55 NaN
Args
series_ids
:List[str]
- The series ids to get.
price_areas
:List[str]
, optional- The price areas to pick, all price areas if None. Defaults to None.
flatten_columns
:bool
, optional- The column names are flattened if True. Defaults to False.
Returns
DataFrame with all the time series data
Expand source code
def get_latest( series_ids: List[str], price_areas: List[str] = None, flatten_columns: bool = False, ): """Get the most recent data for any number of time series. This function is similar to `get()`, but only fetches data from the past 48 hours, potentially including future hours as well (as in the case of Spot price data). @TODO: Add an argument `hours` that allows the 'lookback' period to be extended to an arbitrary number of hours. Example: >>> # Calling this function at 2020-03-15T10:15:00 >>> archimedes.get_latest( >>> series_ids=["NP/AreaPrices", "NP/ConsumptionImbalancePrices"], >>> price_areas=["NO1"], >>> ) series_id NP/AreaPrices NP/ConsumptionImbalancePrices price_area NO1 NO1 from_dt 2020-03-14T04:11:00+00:00 1.30 1.30 2020-03-14T05:12:00+00:00 1.35 1.35 ... ... ... 2020-03-15T22:00:00+00:00 0.53 NaN 2020-03-15T23:00:00+00:00 0.55 NaN Args: series_ids (List[str]): The series ids to get. price_areas (List[str], optional): The price areas to pick, all price areas if None. Defaults to None. flatten_columns (bool, optional): The column names are flattened if True. Defaults to False. Returns: DataFrame with all the time series data """ now_dt = pd.Timestamp.now(tz="utc") print(now_dt) start_dt = now_dt - datetime.timedelta(days=2) # +14 days should be enough in all cases now: end_dt = now_dt + datetime.timedelta(days=14) df = get( series_ids=series_ids, price_areas=price_areas, start=start_dt.isoformat(), end=end_dt.isoformat(), flatten_columns=flatten_columns ) return df
def list_ids()
-
List all the series ids available.
Example
>>> archimedes.list_ids() series_id 0 NP/NegativeProductionImbalancePrices 1 SN/FRRADownVolume .. ... 38 NP/OrdinaryDownVolume 39 NP/SpecialUpVolume
Expand source code
def list_ids(): """List all the series ids available. Example: >>> archimedes.list_ids() series_id 0 NP/NegativeProductionImbalancePrices 1 SN/FRRADownVolume .. ... 38 NP/OrdinaryDownVolume 39 NP/SpecialUpVolume """ if db is None: raise ValueError(db_error_msg) query = """ SELECT distinct series_id from nordpool UNION SELECT distinct series_id from statnett """ rows = db.query(query) return rows.export("df")
def load_latest_model(project_name: str, model_name: str)
-
Load the latest model for a given project and model
Args
project_name
:str
- The name of the project
model_name
:str
- The name of the model
Expand source code
def load_latest_model(project_name: str, model_name: str): """Load the latest model for a given project and model Args: project_name (str): The name of the project model_name (str): The name of the model """ mlflow.set_experiment(project_name) df = mlflow.search_runs() df = df[df["tags.mlflow.runName"] == model_name] latest_run_id = df.iloc[0]["run_id"] run = mlflow.get_run(latest_run_id) return run
def log(message: str)
-
Log a message
Args
message
:str
- The message to log
Expand source code
def log(message: str): """Log a message Args: message (str): The message to log """ logging.info(message)
def run(func: Union[Callable, str], model_name: str, local_mlflow: bool = False)
-
Run a function, without deploying it.
The first argument can be either a function, the path to a python file or a string on the format app:myfunction.
Example
>>> def myfunction(): >>> x = 2 >>> print(f"The number x is {x}") >>> archimedes.run(myfunction, "My first function") INFO: Starting run at 2020-08-20T23:03:53.788115 INFO: MLFlow URI: /Users/jo/mlruns hello INFO: Ending run at 2020-08-20T23:03:53.794075 INFO: The run took 0:00:00.005960
Args
func
:Union[Callable, str]
- The function to deploy.
model_name
:str
- The name of the model you're running.
local_mlflow
:bool
, optional- If True, uses the local MLFlow. Defaults to False.
Expand source code
def run(func: Union[Callable, str], model_name: str, local_mlflow: bool = False): """Run a function, without deploying it. The first argument can be either a function, the path to a python file or a string on the format app:myfunction. Example: >>> def myfunction(): >>> x = 2 >>> print(f"The number x is {x}") >>> archimedes.run(myfunction, "My first function") INFO: Starting run at 2020-08-20T23:03:53.788115 INFO: MLFlow URI: /Users/jo/mlruns hello INFO: Ending run at 2020-08-20T23:03:53.794075 INFO: The run took 0:00:00.005960 Args: func (Union[Callable, str]): The function to deploy. model_name (str): The name of the model you're running. local_mlflow (bool, optional): If True, uses the local MLFlow. Defaults to False. """ context = _setup(local_mlflow) mlflow.set_experiment(context["project_name"]) mlflow.start_run(run_name=model_name,) mlflow.set_tags(context) mlflow.set_tag("run_type", "MANUAL") run_start = datetime.datetime.utcnow() logging.info("Starting run at %s" % run_start.isoformat()) log("MLFlow URI: %s" % mlflow.get_tracking_uri()) func() mlflow.end_run() run_end = datetime.datetime.utcnow() run_delta = run_end - run_start logging.info("Ending run at %s" % run_end.isoformat()) logging.info("The run took %s" % run_delta)
def store(x, name, show=False)
-
Store x in mlflow.
x can either be a dataframe, or a value.
Args
- x (): The thing to store
name
:str
- The name of the thing
Expand source code
def store(x, name, show=False): """Store x in mlflow. x can either be a dataframe, or a value. Args: x (): The thing to store name (str): The name of the thing """ if isinstance(x, pd.DataFrame): _store_dataframe(x, name) elif isinstance(x, dict): _store_dict(x, name) elif isinstance(x, matplotlib.figure.Figure): _store_plot(x, name, show) elif isinstance(x, int): _store_metric(x, name) elif isinstance(x, float): _store_metric(x, name) else: raise TypeError("%s type not implemented yet." % type(x))
def store_test_results(y_true: pandas.core.series.Series, y_pred: pandas.core.series.Series, show: bool = False)
-
Store the results of a model
Args
y_true
:pd.Series
- The actual target values
y_pred
:pd.Series
- The predicted target values
show
:bool
, optional- If True, also show the charts on screen. Defaults to False.
Expand source code
def store_test_results(y_true: pd.Series, y_pred: pd.Series, show: bool=False): """Store the results of a model Args: y_true (pd.Series): The actual target values y_pred (pd.Series): The predicted target values show (bool, optional): If True, also show the charts on screen. Defaults to False. """ _plot_test_results_scatter(y_true, y_pred, show) _plot_test_results_lines(y_true, y_pred, show)