import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Union
from nipype.interfaces.base import (
BaseInterface,
BaseInterfaceInputSpec,
Directory,
TraitedSpec,
isdefined,
traits,
)
[docs]
class ProcedureOutputSpec(TraitedSpec):
output_directory = Directory(desc="Output directory")
log_file = traits.File(desc="Log file")
[docs]
class Procedure(BaseInterface):
input_spec = ProcedureInputSpec
output_spec = ProcedureOutputSpec
_version = "0.0.1"
def __init__(self, **inputs: Any):
super().__init__(**inputs)
def _run_interface(self, runtime) -> Any:
"""
Executes the interface, setting up logging and calling the procedure.
"""
# Validate directories and set up logging
if not isdefined(self.inputs.logging_directory):
self.inputs.logging_directory = (
Path(self.inputs.output_directory).parent / "logs"
)
self.setup_logging()
# Check if the procedure has already been run
finished_file, proceed = self._check_old_runs_finished()
if not proceed:
return runtime
self.logger.info(
f"Running procedure with input directory: {self.inputs.input_directory}" # noqa: E501
)
# Run the custom procedure
self.run_procedure(**self.inputs.get())
self.logger.info(
f"Procedure completed. Output directory: {self.inputs.output_directory}" # noqa: E501
)
self._write_finished_file(finished_file)
return runtime
def _check_old_runs_finished(self) -> Any:
"""
Logs the completion of the procedure.
"""
# set up a "finished" file to keep track of when the procedure was last run # noqa: E501
finished_file = (
Path(self.inputs.logging_directory)
/ f"{type(self).__name__}-{self._version}.done.json"
)
proceed = True
if finished_file.exists():
if self.inputs.force:
self.logger.info(
f"Removing {finished_file} because force=True. Will run procedure again." # noqa: E501
)
finished_file.unlink()
return finished_file, proceed
# read the timestamp of the last run from the file
with open(str(finished_file), "r") as f:
data = json.load(f)
timestamp = data["timestamp"]
config = data["config"]
self.logger.info(
f"Procedure was last run on {timestamp}. Checking if the configuration is the same." # noqa: E501
)
# check if the configuration is the same as the current configuration # noqa: E501
if self.inputs.output_directory == config["output_directory"]:
msg = "User requested to regenerate outputs in the same directory. Please change the output directory or set force=True." # noqa: E501
self.logger.error(
msg,
)
proceed = False
else:
proceed = True
return finished_file, proceed
def _write_finished_file(self, finished_file: Union[str, Path]):
"""
Writes a "finished" file to keep track of when the procedure was last run. # noqa: E501
Parameters
----------
finished_file : Union[str, Path]
The path to the finished file.
"""
config_to_save = {}
# Fix JSON serialization issues
for key, value in self.inputs.get().items():
if isinstance(value, Path):
config_to_save[key] = str(value)
elif not isdefined(value):
config_to_save[key] = None # type: ignore[assignment]
else:
config_to_save[key] = value
with open(str(finished_file), "w") as f:
json.dump(
{"timestamp": str(datetime.now()), "config": config_to_save},
f, # noqa: E501
indent=6,
)
def _check_same_configuration(self, config: Dict[str, Any]) -> bool:
"""
Checks if the configuration of the procedure is the same as the provided configuration. # noqa: E501
"""
# checks whether the provided configuration is the same as the current configuration # noqa: E501
current_config = self.inputs.get()
return current_config == config
def _list_outputs(self) -> Dict[str, str]:
"""
Lists the outputs of the procedure.
"""
outputs = self._outputs().get()
outputs["output_directory"] = str(self.inputs.output_directory)
outputs["log_file"] = str(self.log_file_path)
return outputs
def _gen_log_filename(self) -> str:
"""
Generates a log filename based on the procedure name and the current timestamp.
"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
return f"{self.__class__.__name__}_{timestamp}.log"
[docs]
def setup_logging(self):
"""
Sets up logging configuration.
"""
# Reset logging configuration
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
# Set up logging configuration
logging_dir = Path(self.inputs.logging_directory)
if not logging_dir.exists():
logging_dir.mkdir(parents=True, exist_ok=True)
log_file_path = logging_dir / self._gen_log_filename()
self.log_file_path = log_file_path
logging.basicConfig(
filename=log_file_path,
level=getattr(logging, self.inputs.logging_level),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.debug(f"Logging setup complete. Log file: {log_file_path}")
[docs]
def run_procedure(self, **kwargs):
"""
This method should be implemented by subclasses to define the specific steps of the procedure.
"""
raise NotImplementedError("Subclasses should implement this method")