feat: initial application release

This commit is contained in:
Nathaniel Landau
2022-12-23 04:10:08 +00:00
parent 35717e0760
commit b7bcf74926
78 changed files with 15508 additions and 0 deletions

View File

@@ -0,0 +1 @@
"""obsidian-metadata package."""

View File

@@ -0,0 +1,2 @@
"""obsidian-metadata version."""
__version__ = "0.0.0"

View File

@@ -0,0 +1,5 @@
"""Config module for obsidian frontmatter."""
from obsidian_metadata._config.config import Config
__all__ = ["Config"]

View File

@@ -0,0 +1,116 @@
"""Instantiate the configuration object."""
import re
import shutil
from pathlib import Path
from typing import Any
import questionary
import rich.repr
import typer
from obsidian_metadata._utils import alerts, vault_validation
from obsidian_metadata._utils.alerts import logger as log
try:
import tomllib
except ModuleNotFoundError:
import tomli as tomllib # type: ignore [no-redef]
DEFAULT_CONFIG_FILE: Path = Path(__file__).parent / "default.toml"
@rich.repr.auto
class Config:
"""Configuration class."""
def __init__(self, config_path: Path = None, vault_path: Path = None) -> None:
self.config_path: Path = self._validate_config_path(Path(config_path))
self.config: dict[str, Any] = self._load_config()
self.config_content: str = self.config_path.read_text()
self.vault_path: Path = self._validate_vault_path(vault_path)
try:
self.exclude_paths: list[Any] = self.config["exclude_paths"]
except KeyError:
self.exclude_paths = []
try:
self.metadata_location: str = self.config["metadata"]["metadata_location"]
except KeyError:
self.metadata_location = "frontmatter"
try:
self.tags_location: str = self.config["metadata"]["tags_location"]
except KeyError:
self.tags_location = "top"
log.debug(f"Loaded configuration from '{self.config_path}'")
log.trace(self.config)
def __rich_repr__(self) -> rich.repr.Result: # pragma: no cover
"""Define rich representation of Vault."""
yield "config_path", self.config_path
yield "config_content",
yield "vault_path", self.vault_path
yield "metadata_location", self.metadata_location
yield "tags_location", self.tags_location
yield "exclude_paths", self.exclude_paths
def _validate_config_path(self, config_path: Path | None) -> Path:
"""Load the configuration path."""
if config_path is None:
config_path = Path(Path.home() / f".{__package__.split('.')[0]}.toml")
if not config_path.exists():
shutil.copy(DEFAULT_CONFIG_FILE, config_path)
alerts.info(f"Created default configuration file at '{config_path}'")
return config_path.expanduser().resolve()
def _load_config(self) -> dict[str, Any]:
"""Load the configuration file."""
try:
with self.config_path.open("rb") as f:
return tomllib.load(f)
except tomllib.TOMLDecodeError as e:
alerts.error(f"Could not parse '{self.config_path}'")
raise typer.Exit(code=1) from e
def _validate_vault_path(self, vault_path: Path | None) -> Path:
"""Validate the vault path."""
if vault_path is None:
try:
vault_path = Path(self.config["vault"]).expanduser().resolve()
except KeyError:
vault_path = Path("/I/Do/Not/Exist")
if not vault_path.exists(): # pragma: no cover
alerts.error(f"Vault path not found: '{vault_path}'")
vault_path = questionary.path(
"Enter a path to Obsidian vault:",
only_directories=True,
validate=vault_validation,
).ask()
if vault_path is None:
raise typer.Exit(code=1)
vault_path = Path(vault_path).expanduser().resolve()
self.write_config_value("vault", str(vault_path))
return vault_path
def write_config_value(self, key: str, value: str | int) -> None:
"""Write a new value to the configuration file.
Args:
key (str): The key to write.
value (str|int): The value to write.
"""
self.config_content = re.sub(
rf"( *{key} = ['\"])[^'\"]*(['\"].*)", rf"\1{value}\2", self.config_content
)
alerts.notice(f"Writing new configuration for '{key}' to '{self.config_path}'")
self.config_path.write_text(self.config_content)

View File

@@ -0,0 +1,5 @@
# Path to your obsidian vault
vault = "/path/to/vault"
# Folders within the vault to ignore when indexing metadata
exclude_paths = [".git", ".obsidian"]

View File

@@ -0,0 +1,27 @@
"""Shared utilities."""
from obsidian_metadata._utils import alerts
from obsidian_metadata._utils.alerts import LoggerManager
from obsidian_metadata._utils.utilities import (
clean_dictionary,
clear_screen,
dict_contains,
dict_values_to_lists_strings,
docstring_parameter,
remove_markdown_sections,
vault_validation,
version_callback,
)
__all__ = [
"alerts",
"clean_dictionary",
"clear_screen",
"dict_values_to_lists_strings",
"dict_contains",
"docstring_parameter",
"LoggerManager",
"remove_markdown_sections",
"vault_validation",
"version_callback",
]

View File

@@ -0,0 +1,242 @@
"""Logging and alerts."""
import sys
from pathlib import Path
import rich.repr
import typer
from loguru import logger
from rich import print
def dryrun(msg: str) -> None:
"""Print a message if the dry run flag is set.
Args:
msg: Message to print
"""
print(f"[cyan]DRYRUN | {msg}[/cyan]")
def success(msg: str) -> None:
"""Print a success message without using logging.
Args:
msg: Message to print
"""
print(f"[green]SUCCESS | {msg}[/green]")
def warning(msg: str) -> None:
"""Print a warning message without using logging.
Args:
msg: Message to print
"""
print(f"[yellow]WARNING | {msg}[/yellow]")
def error(msg: str) -> None:
"""Print an error message without using logging.
Args:
msg: Message to print
"""
print(f"[red]ERROR | {msg}[/red]")
def notice(msg: str) -> None:
"""Print a notice message without using logging.
Args:
msg: Message to print
"""
print(f"[bold]NOTICE | {msg}[/bold]")
def info(msg: str) -> None:
"""Print a notice message without using logging.
Args:
msg: Message to print
"""
print(f"INFO | {msg}")
def dim(msg: str) -> None:
"""Print a message in dimmed color.
Args:
msg: Message to print
"""
print(f"[dim]{msg}[/dim]")
def _log_formatter(record: dict) -> str:
"""Create custom log formatter based on the log level. This effects the logs sent to stdout/stderr but not the log file."""
if (
record["level"].name == "INFO"
or record["level"].name == "SUCCESS"
or record["level"].name == "WARNING"
):
return "<level>{level: <8}</level> | <level>{message}</level>\n{exception}"
return "<level>{level: <8}</level> | <level>{message}</level> <fg #c5c5c5>({name}:{function}:{line})</fg #c5c5c5>\n{exception}"
@rich.repr.auto
class LoggerManager:
"""Instantiate the loguru logging system with the following levels.
- TRACE: Usage: log.trace("")
- DEBUG: Usage: log.debug("")
- INFO: Usage: log.info("")
- WARNING: Usage: log.warning("")
- ERROR: Usage: log.error("")
- CRITICAL: Usage: log.critical("")
- EXCEPTION: Usage: log.exception("")
Attributes:
log_file (Path): Path to the log file.
verbosity (int): Verbosity level.
log_to_file (bool): Whether to log to a file.
log_level (int): Default log level (verbosity overrides this)
Examples:
Instantiate the logger:
logging = _utils.alerts.LoggerManager(
verbosity,
log_to_file,
log_file,
log_level)
"""
def __init__(
self,
log_file: Path = Path("/logs"),
verbosity: int = 0,
log_to_file: bool = False,
log_level: int = 30,
) -> None:
self.verbosity = verbosity
self.log_to_file = log_to_file
self.log_file = log_file
self.log_level = log_level
if self.log_file == Path("/logs") and self.log_to_file: # pragma: no cover
print("No log file specified")
raise typer.Exit(1)
if self.verbosity >= 3:
logger.remove()
logger.add(
sys.stderr,
level="TRACE",
format=_log_formatter, # type: ignore[arg-type]
backtrace=False,
diagnose=True,
)
self.log_level = 5
elif self.verbosity == 2:
logger.remove()
logger.add(
sys.stderr,
level="DEBUG",
format=_log_formatter, # type: ignore[arg-type]
backtrace=False,
diagnose=True,
)
self.log_level = 10
elif self.verbosity == 1:
logger.remove()
logger.add(
sys.stderr,
level="INFO",
format=_log_formatter, # type: ignore[arg-type]
backtrace=False,
diagnose=True,
)
self.log_level = 20
else:
logger.remove()
logger.add(
sys.stderr,
format=_log_formatter, # type: ignore[arg-type]
level="WARNING",
backtrace=False,
diagnose=True,
)
self.log_level = 30
if self.log_to_file is True:
logger.add(
self.log_file,
rotation="5 MB",
level=self.log_level,
backtrace=False,
diagnose=True,
delay=True,
)
logger.debug(f"Logging to file: {self.log_file}")
logger.debug("Logging instantiated")
def is_trace(self, msg: str | None = None) -> bool:
"""Check if the current log level is TRACE.
Args:
msg (optional): Message to print. Defaults to None.
Returns:
bool: True if the current log level is TRACE or lower, False otherwise.
"""
if self.log_level <= 5:
if msg:
print(msg)
return True
return False
def is_debug(self, msg: str | None = None) -> bool:
"""Check if the current log level is DEBUG.
Args:
msg (optional): Message to print. Defaults to None.
Returns:
bool: True if the current log level is DEBUG or lower, False otherwise.
"""
if self.log_level <= 10:
if msg:
print(msg)
return True
return False
def is_info(self, msg: str | None = None) -> bool:
"""Check if the current log level is INFO.
Args:
msg (optional): Message to print. Defaults to None.
Returns:
bool: True if the current log level is INFO or lower, False otherwise.
"""
if self.log_level <= 20:
if msg:
print(msg)
return True
return False
def is_default(self, msg: str | None = None) -> bool:
"""Check if the current log level is default level (SUCCESS or WARNING).
Args:
msg (optional): Message to print. Defaults to None.
Returns:
bool: True if the current log level is default or lower, False otherwise.
"""
if self.log_level <= 30:
if msg:
print(msg)
return True
return False # pragma: no cover

View File

@@ -0,0 +1,169 @@
"""Utility functions."""
import re
from os import name, system
from pathlib import Path
from typing import Any
import typer
from obsidian_metadata.__version__ import __version__
def dict_values_to_lists_strings(dictionary: dict, strip_null_values: bool = False) -> dict:
"""Converts all values in a dictionary to lists of strings.
Args:
dictionary (dict): Dictionary to convert
strip_null (bool): Whether to strip null values
Returns:
dict: Dictionary with all values converted to lists of strings
{key: sorted(new_dict[key]) for key in sorted(new_dict)}
"""
new_dict = {}
if strip_null_values:
for key, value in dictionary.items():
if isinstance(value, list):
new_dict[key] = sorted([str(item) for item in value if item is not None])
elif isinstance(value, dict):
new_dict[key] = dict_values_to_lists_strings(value) # type: ignore[assignment]
elif value is None or value == "None" or value == "":
new_dict[key] = []
else:
new_dict[key] = [str(value)]
return new_dict
for key, value in dictionary.items():
if isinstance(value, list):
new_dict[key] = sorted([str(item) for item in value])
elif isinstance(value, dict):
new_dict[key] = dict_values_to_lists_strings(value) # type: ignore[assignment]
else:
new_dict[key] = [str(value)]
return new_dict
def remove_markdown_sections(
text: str,
strip_codeblocks: bool = False,
strip_inlinecode: bool = False,
strip_frontmatter: bool = False,
) -> str:
"""Strips markdown sections from text.
Args:
text (str): Text to remove code blocks from
strip_codeblocks (bool, optional): Strip code blocks. Defaults to False.
strip_inlinecode (bool, optional): Strip inline code. Defaults to False.
strip_frontmatter (bool, optional): Strip frontmatter. Defaults to False.
Returns:
str: Text without code blocks
"""
if strip_codeblocks:
text = re.sub(r"`{3}.*?`{3}", "", text, flags=re.DOTALL)
if strip_inlinecode:
text = re.sub(r"`.*?`", "", text)
if strip_frontmatter:
text = re.sub(r"^\s*---.*?---", "", text, flags=re.DOTALL)
return text # noqa: RET504
def version_callback(value: bool) -> None:
"""Print version and exit."""
if value:
print(f"{__package__.split('.')[0]}: v{__version__}")
raise typer.Exit()
def vault_validation(path: str) -> bool | str:
"""Validates the vault path."""
path_to_validate: Path = Path(path).expanduser().resolve()
if not path_to_validate.exists():
return f"Path does not exist: {path_to_validate}"
if not path_to_validate.is_dir():
return f"Path is not a directory: {path_to_validate}"
return True
def docstring_parameter(*sub: Any) -> Any:
"""Decorator to replace variables within docstrings.
Args:
sub (Any): Replacement variables
Usage:
@docstring_parameter("foo", "bar")
def foo():
'''This is a {0} docstring with {1} variables.'''
"""
def dec(obj: Any) -> Any:
"""Format object."""
obj.__doc__ = obj.__doc__.format(*sub)
return obj
return dec
def clean_dictionary(dictionary: dict[str, Any]) -> dict[str, Any]:
"""Clean up a dictionary by markdown formatting from keys and values.
Args:
dictionary (dict): Dictionary to clean
Returns:
dict: Cleaned dictionary
"""
new_dict = {key.strip(): value for key, value in dictionary.items()}
new_dict = {key.strip("*[]#"): value for key, value in new_dict.items()}
for key, value in new_dict.items():
new_dict[key] = [s.strip("*[]#") for s in value if isinstance(value, list)]
return new_dict
def clear_screen() -> None:
"""Clears the screen."""
# for windows
_ = system("cls") if name == "nt" else system("clear")
def dict_contains(
dictionary: dict[str, list[str]], key: str, value: str = None, is_regex: bool = False
) -> bool:
"""Checks if a dictionary contains a key.
Args:
dictionary (dict): Dictionary to check
key (str): Key to check for
value (str, optional): Value to check for. Defaults to None.
is_regex (bool, optional): Whether the key is a regex. Defaults to False.
Returns:
bool: Whether the dictionary contains the key
"""
if value is None:
if is_regex:
return any(re.search(key, str(_key)) for _key in dictionary)
return key in dictionary
if is_regex:
found_keys = []
for _key in dictionary:
if re.search(key, str(_key)):
found_keys.append(
any(re.search(value, _v) for _v in dictionary[_key]),
)
return any(found_keys)
return key in dictionary and value in dictionary[key]

View File

@@ -0,0 +1,115 @@
"""obsidian-metadata CLI."""
from pathlib import Path
from typing import Optional
import typer
from rich import print
from obsidian_metadata._config import Config
from obsidian_metadata._utils import alerts, docstring_parameter, version_callback
from obsidian_metadata.models import Application
app = typer.Typer(add_completion=False, no_args_is_help=True, rich_markup_mode="rich")
typer.rich_utils.STYLE_HELPTEXT = ""
HELP_TEXT = """
"""
@app.command()
@docstring_parameter(__package__)
def main(
vault_path: Path = typer.Option(
None,
help="Path to Obsidian vault",
show_default=False,
),
config_file: Path = typer.Option(
Path(Path.home() / f".{__package__}.toml"),
help="Specify a custom path to a configuration file",
show_default=False,
),
dry_run: bool = typer.Option(
False,
"--dry-run",
"-n",
help="Dry run - don't actually change anything",
),
log_file: Path = typer.Option(
Path(Path.home() / "logs" / "obsidian_metadata.log"),
help="Path to log file",
show_default=True,
dir_okay=False,
file_okay=True,
exists=False,
),
log_to_file: bool = typer.Option(
False,
"--log-to-file",
help="Log to file",
show_default=True,
),
verbosity: int = typer.Option(
0,
"-v",
"--verbose",
show_default=False,
help="""Set verbosity level (0=WARN, 1=INFO, 2=DEBUG, 3=TRACE)""",
count=True,
),
version: Optional[bool] = typer.Option(
None, "--version", help="Print version and exit", callback=version_callback, is_eager=True
),
) -> None:
r"""A script to make batch updates to metadata in an Obsidian vault.
[bold] [/]
[bold underline]Features:[/]
- [code]in-text tags:[/] delete every occurrence
- [code]in-text tags:[/] Rename tag ([dim]#tag1[/] -> [dim]#tag2[/])
- [code]frontmatter:[/] Delete a key matching a regex pattern and all associated values
- [code]frontmatter:[/] Rename a key
- [code]frontmatter:[/] Delete a value matching a regex pattern from a specified key
- [code]frontmatter:[/] Rename a value from a specified key
- [code]inline metadata:[/] Delete a key matching a regex pattern and all associated values
- [code]inline metadata:[/] Rename a key
- [code]inline metadata:[/] Delete a value matching a regex pattern from a specified key
- [code]inline metadata:[/] Rename a value from a specified key
- [code]vault:[/] Create a backup of the Obsidian vault.
[bold underline]Usage:[/]
Run [tan]obsidian-metadata[/] from the command line. The script will allow you to make batch updates to metadata in an Obsidian vault. Once you have made your changes, review them prior to committing them to the vault.
Configuration is specified in a configuration file. On First run, this file will be created at [tan]~/.{0}.env[/]. Any options specified on the command line will override the configuration file.
"""
# Instantiate logger
alerts.LoggerManager( # pragma: no cover
log_file,
verbosity,
log_to_file,
)
config: Config = Config(config_path=config_file, vault_path=vault_path)
application = Application(dry_run=dry_run, config=config)
banner = r"""
___ _ _ _ _
/ _ \| |__ ___(_) __| (_) __ _ _ __
| | | | '_ \/ __| |/ _` | |/ _` | '_ \
| |_| | |_) \__ \ | (_| | | (_| | | | |
\___/|_.__/|___/_|\__,_|_|\__,_|_| |_|
| \/ | ___| |_ __ _ __| | __ _| |_ __ _
| |\/| |/ _ \ __/ _` |/ _` |/ _` | __/ _` |
| | | | __/ || (_| | (_| | (_| | || (_| |
|_| |_|\___|\__\__,_|\__,_|\__,_|\__\__,_|
"""
print(banner)
application.main_app()
if __name__ == "__main__":
app()

View File

@@ -0,0 +1,24 @@
"""Shared models."""
from obsidian_metadata.models.patterns import Patterns # isort: skip
from obsidian_metadata.models.metadata import (
Frontmatter,
InlineMetadata,
InlineTags,
VaultMetadata,
)
from obsidian_metadata.models.notes import Note
from obsidian_metadata.models.vault import Vault
from obsidian_metadata.models.application import Application # isort: skip
__all__ = [
"Frontmatter",
"InlineMetadata",
"InlineTags",
"LoggerManager",
"Note",
"Patterns",
"Application",
"Vault",
"VaultMetadata",
]

View File

@@ -0,0 +1,370 @@
"""Questions for the cli."""
from typing import Any
import questionary
import typer
from rich import print
from obsidian_metadata._config import Config
from obsidian_metadata._utils import alerts, clear_screen
from obsidian_metadata._utils.alerts import logger as log
from obsidian_metadata.models import Patterns, Vault
PATTERNS = Patterns()
class Application:
"""Questions for use in the cli.
Contains methods which ask a series of questions to the user and return a dictionary with their answers.
More info: https://questionary.readthedocs.io/en/stable/pages/advanced.html#create-questions-from-dictionaries
"""
def __init__(self, config: Config, dry_run: bool) -> None:
self.config = config
self.dry_run = dry_run
self.custom_style = questionary.Style(
[
("separator", "bold fg:#6C6C6C"),
("instruction", "fg:#6C6C6C"),
("highlighted", "bold reverse"),
("pointer", "bold"),
]
)
clear_screen()
def load_vault(self, path_filter: str = None) -> None:
"""Load the vault.
Args:
path_filter (str, optional): Regex to filter notes by path.
"""
self.vault: Vault = Vault(config=self.config, dry_run=self.dry_run, path_filter=path_filter)
log.info(f"Indexed {self.vault.num_notes()} notes from {self.vault.vault_path}")
def main_app(self) -> None: # noqa: C901
"""Questions for the main application."""
self.load_vault()
while True:
self.vault.info()
operation = questionary.select(
"What do you want to do?",
choices=[
questionary.Separator("\n-- VAULT ACTIONS -----------------"),
{"name": "Backup vault", "value": "backup_vault"},
{"name": "Delete vault backup", "value": "delete_backup"},
{"name": "View all metadata", "value": "all_metadata"},
{"name": "List notes in scope", "value": "list_notes"},
{
"name": "Filter the notes being processed by their path",
"value": "filter_notes",
},
questionary.Separator("\n-- INLINE TAG ACTIONS ---------"),
questionary.Separator("Tags in the note body"),
{
"name": "Rename an inline tag",
"value": "rename_inline_tag",
},
{
"name": "Delete an inline tag",
"value": "delete_inline_tag",
},
questionary.Separator("\n-- METADATA ACTIONS -----------"),
questionary.Separator("Frontmatter or inline metadata"),
{"name": "Rename Key", "value": "rename_key"},
{"name": "Delete Key", "value": "delete_key"},
{"name": "Rename Value", "value": "rename_value"},
{"name": "Delete Value", "value": "delete_value"},
questionary.Separator("\n-- REVIEW/COMMIT CHANGES ------"),
{"name": "Review changes", "value": "review_changes"},
{"name": "Commit changes", "value": "commit_changes"},
questionary.Separator("-------------------------------"),
{"name": "Quit", "value": "abort"},
],
use_shortcuts=False,
style=self.custom_style,
).ask()
if operation == "filter_notes":
path_filter = questionary.text(
"Enter a regex to filter notes by path",
validate=lambda text: len(text) > 0,
).ask()
if path_filter is None:
continue
self.load_vault(path_filter=path_filter)
if operation == "all_metadata":
self.vault.metadata.print_metadata()
if operation == "backup_vault":
self.vault.backup()
if operation == "delete_backup":
self.vault.delete_backup()
if operation == "list_notes":
self.vault.list_editable_notes()
if operation == "rename_inline_tag":
self.rename_inline_tag()
if operation == "delete_inline_tag":
self.delete_inline_tag()
if operation == "rename_key":
self.rename_key()
if operation == "delete_key":
self.delete_key()
if operation == "rename_value":
self.rename_value()
if operation == "delete_value":
self.delete_value()
if operation == "review_changes":
self.review_changes()
if operation == "commit_changes":
self.commit_changes()
if operation == "abort":
break
print("Done!")
return
def rename_key(self) -> None:
"""Renames a key in the vault."""
def validate_key(text: str) -> bool:
"""Validate the key name."""
if self.vault.metadata.contains(text):
return True
return False
def validate_new_key(text: str) -> bool:
"""Validate the tag name."""
if PATTERNS.validate_key_text.search(text) is not None:
return False
if len(text) == 0:
return False
return True
original_key = questionary.text(
"Which key would you like to rename?",
validate=validate_key,
).ask()
if original_key is None:
return
new_key = questionary.text(
"New key name",
validate=validate_new_key,
).ask()
if new_key is None:
return
self.vault.rename_metadata(original_key, new_key)
def rename_inline_tag(self) -> None:
"""Rename an inline tag."""
def validate_new_tag(text: str) -> bool:
"""Validate the tag name."""
if PATTERNS.validate_tag_text.search(text) is not None:
return False
if len(text) == 0:
return False
return True
original_tag = questionary.text(
"Which tag would you like to rename?",
validate=lambda text: True
if self.vault.contains_inline_tag(text)
else "Tag not found in vault",
).ask()
if original_tag is None:
return
new_tag = questionary.text(
"New tag name",
validate=validate_new_tag,
).ask()
if new_tag is None:
return
self.vault.rename_inline_tag(original_tag, new_tag)
alerts.success(f"Renamed [reverse]{original_tag}[/] to [reverse]{new_tag}[/]")
return
def delete_inline_tag(self) -> None:
"""Delete an inline tag."""
tag = questionary.text(
"Which tag would you like to delete?",
validate=lambda text: True
if self.vault.contains_inline_tag(text)
else "Tag not found in vault",
).ask()
if tag is None:
return
self.vault.delete_inline_tag(tag)
alerts.success(f"Deleted inline tag: {tag}")
return
def delete_key(self) -> None:
"""Delete a key from the vault."""
while True:
key_to_delete = questionary.text("Regex for the key(s) you'd like to delete?").ask()
if key_to_delete is None:
return
if not self.vault.metadata.contains(key_to_delete, is_regex=True):
alerts.warning(f"No matching keys in the vault: {key_to_delete}")
continue
num_changed = self.vault.delete_metadata(key_to_delete)
if num_changed == 0:
alerts.warning(f"No notes found matching: [reverse]{key_to_delete}[/]")
return
alerts.success(
f"Deleted keys matching: [reverse]{key_to_delete}[/] from {num_changed} notes"
)
break
return
def rename_value(self) -> None:
"""Rename a value in the vault."""
key = questionary.text(
"Which key contains the value to rename?",
validate=lambda text: True
if self.vault.metadata.contains(text)
else "Key not found in vault",
).ask()
if key is None:
return
value = questionary.text(
"Which value would you like to rename?",
validate=lambda text: True
if self.vault.metadata.contains(key, text)
else f"Value not found in {key}",
).ask()
if value is None:
return
new_value = questionary.text(
"New value?",
validate=lambda text: True
if not self.vault.metadata.contains(key, text)
else f"Value already exists in {key}",
).ask()
if self.vault.rename_metadata(key, value, new_value):
alerts.success(f"Renamed [reverse]{key}: {value}[/] to [reverse]{key}: {new_value}[/]")
def delete_value(self) -> None:
"""Delete a value from the vault."""
while True:
key = questionary.text(
"Which key contains the value to delete?",
).ask()
if key is None:
return
if not self.vault.metadata.contains(key, is_regex=True):
alerts.warning(f"No keys in value match: {key}")
continue
break
while True:
value = questionary.text(
"Regex for the value to delete",
).ask()
if value is None:
return
if not self.vault.metadata.contains(key, value, is_regex=True):
alerts.warning(f"No matching key value pairs found in the vault: {key}: {value}")
continue
num_changed = self.vault.delete_metadata(key, value)
if num_changed == 0:
alerts.warning(f"No notes found matching: [reverse]{key}: {value}[/]")
return
alerts.success(
f"Deleted {num_changed} entries matching: [reverse]{key}[/]: [reverse]{value}[/]"
)
break
return
def review_changes(self) -> None:
"""Review all changes in the vault."""
changed_notes = self.vault.get_changed_notes()
if len(changed_notes) == 0:
alerts.info("No changes to review.")
return
print(f"\nFound {len(changed_notes)} changed notes in the vault.\n")
answer = questionary.confirm("View diffs of individual files?", default=False).ask()
if not answer:
return
choices: list[dict[str, Any] | questionary.Separator] = [questionary.Separator()]
for n, note in enumerate(changed_notes, start=1):
_selection = {
"name": f"{n}: {note.note_path.relative_to(self.vault.vault_path)}",
"value": n - 1,
}
choices.append(_selection)
choices.append(questionary.Separator())
choices.append({"name": "Return", "value": "skip"})
while True:
note_to_review = questionary.select(
"Select a new to view the diff.",
choices=choices,
use_shortcuts=False,
style=self.custom_style,
).ask()
if note_to_review is None or note_to_review == "skip":
break
changed_notes[note_to_review].print_diff()
def commit_changes(self) -> None:
"""Write all changes to disk."""
changed_notes = self.vault.get_changed_notes()
if len(changed_notes) == 0:
print("\n")
alerts.notice("No changes to commit.\n")
return
backup = questionary.confirm("Create backup before committing changes").ask()
if backup is None:
return
if backup:
self.vault.backup()
if questionary.confirm(f"Commit {len(changed_notes)} changed files to disk?").ask():
self.vault.write()
alerts.success("Changes committed to disk. Exiting.")
typer.Exit()
return

View File

@@ -0,0 +1,505 @@
"""Work with metadata items."""
import re
from io import StringIO
from rich import print
from rich.columns import Columns
from rich.console import Console
from rich.table import Table
from ruamel.yaml import YAML
from obsidian_metadata._utils import (
clean_dictionary,
dict_contains,
dict_values_to_lists_strings,
remove_markdown_sections,
)
from obsidian_metadata.models import Patterns # isort: ignore
PATTERNS = Patterns()
INLINE_TAG_KEY: str = "Inline Tags"
class VaultMetadata:
"""Representation of all Metadata in the Vault."""
def __init__(self) -> None:
self.dict: dict[str, list[str]] = {}
def __repr__(self) -> str:
"""Representation of all metadata."""
return str(self.dict)
def add_metadata(self, metadata: dict[str, list[str]]) -> None:
"""Add metadata to the vault. Takes a dictionary as input and merges it with the existing metadata. Does not overwrite existing keys.
Args:
metadata (dict): Metadata to add.
"""
existing_metadata = self.dict
new_metadata = clean_dictionary(metadata)
for k, v in new_metadata.items():
if k in existing_metadata:
if isinstance(v, list):
existing_metadata[k].extend(v)
else:
existing_metadata[k] = v
for k, v in existing_metadata.items():
if isinstance(v, list):
existing_metadata[k] = sorted(set(v))
elif isinstance(v, dict):
for kk, vv in v.items():
if isinstance(vv, list):
v[kk] = sorted(set(vv))
self.dict = dict(sorted(existing_metadata.items()))
def print_keys(self) -> None:
"""Print all metadata keys."""
columns = Columns(
sorted(self.dict.keys()),
equal=True,
expand=True,
title="All metadata keys in Obsidian vault",
)
print(columns)
def print_tags(self) -> None:
"""Print all tags."""
columns = Columns(
sorted(self.dict["tags"]),
equal=True,
expand=True,
title="All tags in Obsidian vault",
)
print(columns)
def print_metadata(self) -> None:
"""Print all metadata."""
table = Table(show_footer=False, show_lines=True)
table.add_column("Keys")
table.add_column("Values")
for key, value in sorted(self.dict.items()):
values: str | dict[str, list[str]] = (
"\n".join(sorted(value)) if isinstance(value, list) else value
)
table.add_row(f"[bold]{key}[/]", str(values))
Console().print(table)
def contains(self, key: str, value: str = None, is_regex: bool = False) -> bool:
"""Check if a key and/or a value exists in the metadata.
Args:
key (str): Key to check.
value (str, optional): Value to check.
is_regex (bool, optional): Use regex to check. Defaults to False.
Returns:
bool: True if the key exists.
"""
return dict_contains(self.dict, key, value, is_regex)
def delete(self, key: str, value_to_delete: str = None) -> bool:
"""Delete a key or a key's value from the metadata. Regex is supported to allow deleting more than one key or value.
Args:
key (str): Key to check.
value_to_delete (str, optional): Value to delete.
Returns:
bool: True if a value was deleted
"""
new_dict = self.dict.copy()
if value_to_delete is None:
for _k in list(new_dict):
if re.search(key, _k):
del new_dict[_k]
else:
for _k, _v in new_dict.items():
if re.search(key, _k):
new_values = [x for x in _v if not re.search(value_to_delete, x)]
new_dict[_k] = sorted(new_values)
if new_dict != self.dict:
self.dict = dict(new_dict)
return True
return False
def rename(self, key: str, value_1: str, value_2: str = None) -> bool:
"""Replace a value in the frontmatter.
Args:
key (str): Key to check.
value_1 (str): `With value_2` this is the value to rename. If `value_2` is None this is the renamed key
value_2 (str, Optional): New value.
bypass_check (bool, optional): Bypass the check if the key exists. Defaults to False.
Returns:
bool: True if a value was renamed
"""
if value_2 is None:
if key in self.dict and value_1 not in self.dict:
self.dict[value_1] = self.dict.pop(key)
return True
return False
if key in self.dict and value_1 in self.dict[key]:
self.dict[key] = sorted({value_2 if x == value_1 else x for x in self.dict[key]})
return True
return False
class Frontmatter:
"""Representation of frontmatter metadata."""
def __init__(self, file_content: str):
self.dict: dict[str, list[str]] = self._grab_note_frontmatter(file_content)
self.dict_original: dict[str, list[str]] = self.dict.copy()
def __repr__(self) -> str: # pragma: no cover
"""Representation of the frontmatter.
Returns:
str: frontmatter
"""
return f"Frontmatter(frontmatter={self.dict})"
def _grab_note_frontmatter(self, file_content: str) -> dict:
"""Grab metadata from a note.
Args:
note_path (Path): Path to the note file.
Returns:
dict: Metadata from the note.
"""
try:
frontmatter_block: str = PATTERNS.frontmatt_block_no_separators.search(
file_content
).group("frontmatter")
except AttributeError:
return {}
yaml = YAML(typ="safe")
frontmatter: dict = yaml.load(frontmatter_block)
for k in frontmatter:
if frontmatter[k] is None:
frontmatter[k] = []
return dict_values_to_lists_strings(frontmatter, strip_null_values=True)
def contains(self, key: str, value: str = None, is_regex: bool = False) -> bool:
"""Check if a key or value exists in the metadata.
Args:
key (str): Key to check.
value (str, optional): Value to check.
is_regex (bool, optional): Use regex to check. Defaults to False.
Returns:
bool: True if the key exists.
"""
return dict_contains(self.dict, key, value, is_regex)
def rename(self, key: str, value_1: str, value_2: str = None) -> bool:
"""Replace a value in the frontmatter.
Args:
key (str): Key to check.
value_1 (str): `With value_2` this is the value to rename. If `value_2` is None this is the renamed key
value_2 (str, Optional): New value.
Returns:
bool: True if a value was renamed
"""
if value_2 is None:
if key in self.dict and value_1 not in self.dict:
self.dict[value_1] = self.dict.pop(key)
return True
return False
if key in self.dict and value_1 in self.dict[key]:
self.dict[key] = sorted({value_2 if x == value_1 else x for x in self.dict[key]})
return True
return False
def delete(self, key: str, value_to_delete: str = None) -> bool:
"""Delete a value or key in the frontmatter. Regex is supported to allow deleting more than one key or value.
Args:
key (str): If no value, key to delete. If value, key containing the value.
value_to_delete (str, optional): Value to delete.
Returns:
bool: True if a value was deleted
"""
new_dict = dict(self.dict)
if value_to_delete is None:
for _k in list(new_dict):
if re.search(key, _k):
del new_dict[_k]
else:
for _k, _v in new_dict.items():
if re.search(key, _k):
new_values = [x for x in _v if not re.search(value_to_delete, x)]
new_dict[_k] = sorted(new_values)
if new_dict != self.dict:
self.dict = dict(new_dict)
return True
return False
def has_changes(self) -> bool:
"""Check if the frontmatter has changes.
Returns:
bool: True if the frontmatter has changes.
"""
return self.dict != self.dict_original
def to_yaml(self, sort_keys: bool = False) -> str:
"""Return the frontmatter as a YAML string.
Returns:
str: Frontmatter as a YAML string.
sort_keys (bool, optional): Sort the keys. Defaults to False.
"""
dict_to_dump = self.dict.copy()
for k in dict_to_dump:
if dict_to_dump[k] == []:
dict_to_dump[k] = None
if isinstance(dict_to_dump[k], list) and len(dict_to_dump[k]) == 1:
new_val = dict_to_dump[k][0]
dict_to_dump[k] = new_val # type: ignore [assignment]
# Converting stream to string from https://stackoverflow.com/questions/47614862/best-way-to-use-ruamel-yaml-to-dump-yaml-to-string-not-to-stream/63179923#63179923
if sort_keys:
dict_to_dump = dict(sorted(dict_to_dump.items()))
yaml = YAML()
yaml.indent(mapping=2, sequence=4, offset=2)
string_stream = StringIO()
yaml.dump(dict_to_dump, string_stream)
yaml_value = string_stream.getvalue()
string_stream.close()
return yaml_value
class InlineMetadata:
"""Representation of inline metadata in the form of `key:: value`."""
def __init__(self, file_content: str):
self.dict: dict[str, list[str]] = self._grab_inline_metadata(file_content)
self.dict_original: dict[str, list[str]] = self.dict.copy()
def __repr__(self) -> str: # pragma: no cover
"""Representation of inline metadata.
Returns:
str: inline metadata
"""
return f"InlineMetadata(inline_metadata={self.dict})"
def _grab_inline_metadata(self, file_content: str) -> dict[str, list[str]]:
"""Grab inline metadata from a note.
Returns:
dict[str, str]: Inline metadata from the note.
"""
content = remove_markdown_sections(
file_content,
strip_codeblocks=True,
strip_inlinecode=True,
strip_frontmatter=True,
)
all_results = PATTERNS.find_inline_metadata.findall(content)
stripped_null_values = [tuple(filter(None, x)) for x in all_results]
inline_metadata: dict[str, list[str]] = {}
for (k, v) in stripped_null_values:
if k in inline_metadata:
inline_metadata[k].append(str(v))
else:
inline_metadata[k] = [str(v)]
return clean_dictionary(inline_metadata)
def contains(self, key: str, value: str = None, is_regex: bool = False) -> bool:
"""Check if a key or value exists in the inline metadata.
Args:
key (str): Key to check.
value (str, Optional): Value to check.
is_regex (bool, optional): If True, key and value are treated as regex. Defaults to False.
Returns:
bool: True if the key exists.
"""
return dict_contains(self.dict, key, value, is_regex)
def rename(self, key: str, value_1: str, value_2: str = None) -> bool:
"""Replace a value in the inline metadata.
Args:
key (str): Key to check.
value_1 (str): `With value_2` this is the value to rename. If `value_2` is None this is the renamed key
value_2 (str, Optional): New value.
Returns:
bool: True if a value was renamed
"""
if value_2 is None:
if key in self.dict and value_1 not in self.dict:
self.dict[value_1] = self.dict.pop(key)
return True
return False
if key in self.dict and value_1 in self.dict[key]:
self.dict[key] = sorted({value_2 if x == value_1 else x for x in self.dict[key]})
return True
return False
def delete(self, key: str, value_to_delete: str = None) -> bool:
"""Delete a value or key in the inline metadata. Regex is supported to allow deleting more than one key or value.
Args:
key (str): If no value, key to delete. If value, key containing the value.
value_to_delete (str, optional): Value to delete.
Returns:
bool: True if a value was deleted
"""
new_dict = dict(self.dict)
if value_to_delete is None:
for _k in list(new_dict):
if re.search(key, _k):
del new_dict[_k]
else:
for _k, _v in new_dict.items():
if re.search(key, _k):
new_values = [x for x in _v if not re.search(value_to_delete, x)]
new_dict[_k] = sorted(new_values)
if new_dict != self.dict:
self.dict = dict(new_dict)
return True
return False
def has_changes(self) -> bool:
"""Check if the metadata has changes.
Returns:
bool: True if the metadata has changes.
"""
return self.dict != self.dict_original
class InlineTags:
"""Representation of inline tags."""
def __init__(self, file_content: str):
self.metadata_key = INLINE_TAG_KEY
self.list: list[str] = self._grab_inline_tags(file_content)
self.list_original: list[str] = self.list.copy()
def __repr__(self) -> str: # pragma: no cover
"""Representation of the inline tags.
Returns:
str: inline tags
"""
return f"InlineTags(tags={self.list})"
def _grab_inline_tags(self, file_content: str) -> list[str]:
"""Grab inline tags from a note.
Args:
file_content (str): Total contents of the note file (frontmatter and content).
Returns:
list[str]: Inline tags from the note.
"""
return sorted(
PATTERNS.find_inline_tags.findall(
remove_markdown_sections(
file_content,
strip_codeblocks=True,
strip_inlinecode=True,
)
)
)
def contains(self, tag: str, is_regex: bool = False) -> bool:
"""Check if a tag exists in the metadata.
Args:
tag (str): Tag to check.
is_regex (bool, optional): If True, tag is treated as regex. Defaults to False.
Returns:
bool: True if the tag exists.
"""
if is_regex is True:
return any(re.search(tag, _t) for _t in self.list)
if tag in self.list:
return True
return False
def rename(self, old_tag: str, new_tag: str) -> bool:
"""Replace an inline tag with another string.
Args:
old_tag (str): `With value_2` this is the value to rename. If `value_2` is None this is the renamed key
new_tag (str, Optional): New value.
Returns:
bool: True if a value was renamed
"""
if old_tag in self.list:
self.list = sorted([new_tag if i == old_tag else i for i in self.list])
return True
return False
def delete(self, tag_to_delete: str) -> bool:
"""Delete a specified inline tag. Regex is supported to allow deleting more than one tag.
Args:
tag_to_delete (str, optional): Value to delete.
Returns:
bool: True if a value was deleted
"""
new_list = sorted([x for x in self.list if re.search(tag_to_delete, x) is None])
if new_list != self.list:
self.list = new_list
return True
return False
def has_changes(self) -> bool:
"""Check if the metadata has changes.
Returns:
bool: True if the metadata has changes.
"""
return self.list != self.list_original

View File

@@ -0,0 +1,367 @@
"""Representation of notes and in the vault."""
import difflib
import re
from pathlib import Path
import rich.repr
import typer
from rich import print
from obsidian_metadata._utils import alerts
from obsidian_metadata._utils.alerts import logger as log
from obsidian_metadata.models import (
Frontmatter,
InlineMetadata,
InlineTags,
Patterns,
)
PATTERNS = Patterns()
@rich.repr.auto
class Note:
"""Representation of a note in the vault.
Args:
note_path (Path): Path to the note file.
Attributes:
note_path (Path): Path to the note file.
dry_run (bool): Whether to run in dry-run mode.
file_content (str): Total contents of the note file (frontmatter and content).
frontmatter (dict): Frontmatter of the note.
inline_tags (list): List of inline tags in the note.
inline_metadata (dict): Dictionary of inline metadata in the note.
"""
def __init__(self, note_path: Path, dry_run: bool = False):
log.trace(f"Creating Note object for {note_path}")
self.note_path: Path = Path(note_path)
self.dry_run: bool = dry_run
try:
with self.note_path.open():
self.file_content: str = self.note_path.read_text()
except FileNotFoundError as e:
alerts.error(f"Note {self.note_path} not found. Exiting")
raise typer.Exit(code=1) from e
self.frontmatter: Frontmatter = Frontmatter(self.file_content)
self.inline_tags: InlineTags = InlineTags(self.file_content)
self.inline_metadata: InlineMetadata = InlineMetadata(self.file_content)
self.original_file_content: str = self.file_content
def __rich_repr__(self) -> rich.repr.Result: # pragma: no cover
"""Define rich representation of Vault."""
yield "note_path", self.note_path
yield "dry_run", self.dry_run
yield "frontmatter", self.frontmatter
yield "inline_tags", self.inline_tags
yield "inline_metadata", self.inline_metadata
def append(self, string_to_append: str, allow_multiple: bool = False) -> None:
"""Appends a string to the end of a note.
Args:
string_to_append (str): String to append to the note.
allow_multiple (bool): Whether to allow appending the string if it already exists in the note.
"""
if allow_multiple:
self.file_content += f"\n{string_to_append}"
else:
if len(re.findall(re.escape(string_to_append), self.file_content)) == 0:
self.file_content += f"\n{string_to_append}"
def commit_changes(self) -> None:
"""Commits changes to the note to disk."""
# TODO: rewrite frontmatter if it has changed
pass
def contains_inline_tag(self, tag: str, is_regex: bool = False) -> bool:
"""Check if a note contains the specified inline tag.
Args:
tag (str): Tag to check for.
is_regex (bool, optional): Whether to use regex to match the tag.
Returns:
bool: Whether the note has inline tags.
"""
return self.inline_tags.contains(tag, is_regex=is_regex)
def contains_metadata(self, key: str, value: str = None, is_regex: bool = False) -> bool:
"""Check if a note has a key or a key-value pair in its metadata.
Args:
key (str): Key to check for.
value (str, optional): Value to check for.
is_regex (bool, optional): Whether to use regex to match the key/value.
Returns:
bool: Whether the note contains the key or key-value pair.
"""
if value is None:
if self.frontmatter.contains(key, is_regex=is_regex) or self.inline_metadata.contains(
key, is_regex=is_regex
):
return True
return False
if self.frontmatter.contains(
key, value, is_regex=is_regex
) or self.inline_metadata.contains(key, value, is_regex=is_regex):
return True
return False
def _delete_inline_metadata(self, key: str, value: str = None) -> None:
"""Deletes an inline metadata key/value pair from the text of the note. This method does not remove the key/value from the metadata attribute of the note.
Args:
key (str): Key to delete.
value (str, optional): Value to delete.
"""
all_results = PATTERNS.find_inline_metadata.findall(self.file_content)
stripped_null_values = [tuple(filter(None, x)) for x in all_results]
for (_k, _v) in stripped_null_values:
if re.search(key, _k):
if value is None:
_k = re.escape(_k)
_v = re.escape(_v)
self.sub(rf"\[?{_k}:: ?{_v}]?", "", is_regex=True)
return
if re.search(value, _v):
_k = re.escape(_k)
_v = re.escape(_v)
self.sub(rf"({_k}::) ?{_v}", r"\1", is_regex=True)
def delete_inline_tag(self, tag: str) -> bool:
"""Deletes an inline tag from the `inline_tags` attribute AND removes the tag from the text of the note if it exists.
Args:
tag (str): Tag to delete.
Returns:
bool: Whether the tag was deleted.
"""
new_list = self.inline_tags.list.copy()
for _t in new_list:
if re.search(tag, _t):
_t = re.escape(_t)
self.sub(rf"#{_t}([ \|,;:\*\(\)\[\]\\\.\n#&])", r"\1", is_regex=True)
self.inline_tags.delete(tag)
if new_list != self.inline_tags.list:
return True
return False
def delete_metadata(self, key: str, value: str = None) -> bool:
"""Deletes a key or key-value pair from the note's metadata. Regex is supported.
If no value is provided, will delete an entire key.
Args:
key (str): Key to delete.
value (str, optional): Value to delete.
Returns:
bool: Whether the key or key-value pair was deleted.
"""
changed_value: bool = False
if value is None:
if self.frontmatter.delete(key):
self.replace_frontmatter()
changed_value = True
if self.inline_metadata.delete(key):
self._delete_inline_metadata(key, value)
changed_value = True
else:
if self.frontmatter.delete(key, value):
self.replace_frontmatter()
changed_value = True
if self.inline_metadata.delete(key, value):
self._delete_inline_metadata(key, value)
changed_value = True
if changed_value:
return True
return False
def has_changes(self) -> bool:
"""Checks if the note has been updated.
Returns:
bool: Whether the note has been updated.
"""
if self.frontmatter.has_changes():
return True
if self.inline_tags.has_changes():
return True
if self.inline_metadata.has_changes():
return True
if self.file_content != self.original_file_content:
return True
return False
def print_note(self) -> None:
"""Prints the note to the console."""
print(self.file_content)
def print_diff(self) -> None:
"""Prints a diff of the note's original state and it's new state."""
a = self.original_file_content.splitlines()
b = self.file_content.splitlines()
diff = difflib.Differ()
result = list(diff.compare(a, b))
for line in result:
if line.startswith("+"):
print(f"[green]{line}[/]")
elif line.startswith("-"):
print(f"[red]{line}[/]")
def sub(self, pattern: str, replacement: str, is_regex: bool = False) -> None:
"""Substitutes text within the note.
Args:
pattern (str): The pattern to replace (plain text or regular expression).
replacement (str): What to replace the pattern with.
is_regex (bool): Whether the pattern is a regex pattern or plain text.
"""
if not is_regex:
pattern = re.escape(pattern)
self.file_content = re.sub(pattern, replacement, self.file_content, re.MULTILINE)
def _rename_inline_metadata(self, key: str, value_1: str, value_2: str = None) -> None:
"""Replaces the inline metadata in the note with the current inline metadata object.
Args:
key (str): Key to rename.
value_1 (str): Value to replace OR new key name (if value_2 is None).
value_2 (str, optional): New value.
"""
all_results = PATTERNS.find_inline_metadata.findall(self.file_content)
stripped_null_values = [tuple(filter(None, x)) for x in all_results]
for (_k, _v) in stripped_null_values:
if re.search(key, _k):
if value_2 is None:
if re.search(rf"{key}[^\w\d_-]+", _k):
key_text = re.split(r"[^\w\d_-]+$", _k)[0]
key_markdown = re.split(r"^[\w\d_-]+", _k)[1]
self.sub(
rf"{key_text}{key_markdown}::",
rf"{value_1}{key_markdown}::",
)
else:
self.sub(f"{_k}::", f"{value_1}::")
else:
if re.search(key, _k) and re.search(value_1, _v):
_k = re.escape(_k)
_v = re.escape(_v)
self.sub(f"{_k}:: ?{_v}", f"{_k}:: {value_2}", is_regex=True)
def rename_inline_tag(self, tag_1: str, tag_2: str) -> bool:
"""Renames an inline tag from the note ONLY if it's not in the metadata as well.
Args:
tag_1 (str): Tag to rename.
tag_2 (str): New tag name.
Returns:
bool: Whether the tag was renamed.
"""
if tag_1 in self.inline_tags.list:
self.sub(
rf"#{tag_1}([ \|,;:\*\(\)\[\]\\\.\n#&])",
rf"#{tag_2}\1",
is_regex=True,
)
self.inline_tags.rename(tag_1, tag_2)
return True
return False
def rename_metadata(self, key: str, value_1: str, value_2: str = None) -> bool:
"""Renames a key or key-value pair in the note's metadata.
If no value is provided, will rename an entire key.
Args:
key (str): Key to rename.
value_1 (str): Value to rename or new name of key if no value_2 is provided.
value_2 (str, optional): New value.
Returns:
bool: Whether the note was updated.
"""
changed_value: bool = False
if value_2 is None:
if self.frontmatter.rename(key, value_1):
self.replace_frontmatter()
changed_value = True
if self.inline_metadata.rename(key, value_1):
self._rename_inline_metadata(key, value_1)
changed_value = True
else:
if self.frontmatter.rename(key, value_1, value_2):
self.replace_frontmatter()
changed_value = True
if self.inline_metadata.rename(key, value_1, value_2):
self._rename_inline_metadata(key, value_1, value_2)
changed_value = True
if changed_value:
return True
return False
def replace_frontmatter(self, sort_keys: bool = False) -> None:
"""Replaces the frontmatter in the note with the current frontmatter object."""
try:
current_frontmatter = PATTERNS.frontmatt_block_with_separators.search(
self.file_content
).group("frontmatter")
except AttributeError:
current_frontmatter = None
if current_frontmatter is None and self.frontmatter.dict == {}:
return
new_frontmatter = self.frontmatter.to_yaml(sort_keys=sort_keys)
new_frontmatter = f"---\n{new_frontmatter}---\n"
if current_frontmatter is None:
self.file_content = new_frontmatter + self.file_content
return
self.sub(current_frontmatter, new_frontmatter)
def write(self, path: Path | None = None) -> None:
"""Writes the note's content to disk.
Args:
path (Path): Path to write the note to. Defaults to the note's path.
"""
p = self.note_path if path is None else path
try:
with open(p, "w") as f:
log.trace(f"Writing note {p} to disk")
f.write(self.file_content)
except FileNotFoundError as e:
alerts.error(f"Note {p} not found. Exiting")
raise typer.Exit(code=1) from e

View File

@@ -0,0 +1,41 @@
"""Regexes for parsing frontmatter and note content."""
import re
from dataclasses import dataclass
from typing import Pattern
@dataclass
class Patterns:
"""Regex patterns for parsing frontmatter and note content."""
find_inline_tags: Pattern[str] = re.compile(
r"""
(?:^|[ \|_,;:\*\(\)\[\]\\\.]) # Before tag is start of line or separator
\#([^ \|,;:\*\(\)\[\]\\\.\n#&]+) # Match tag until separator or end of line
""",
re.MULTILINE | re.X,
)
frontmatt_block_with_separators: Pattern[str] = re.compile(
r"^\s*(?P<frontmatter>---.*?---)", flags=re.DOTALL
)
frontmatt_block_no_separators: Pattern[str] = re.compile(
r"^\s*---(?P<frontmatter>.*?)---", flags=re.DOTALL
)
# This pattern will return a tuple of 4 values, two will be empty and will need to be stripped before processing further
find_inline_metadata: Pattern[str] = re.compile(
r""" # First look for in-text key values
(?:^\[| \[) # Find key with starting bracket
([-_\w\d\/\*\u263a-\U0001f645]+?)::[ ]? # Find key
(.*?)\] # Find value until closing bracket
| # Else look for key values at start of line
(?:^|[^ \w\d]+| \[) # Any non-word or non-digit character
([-_\w\d\/\*\u263a-\U0001f645]+?)::(?!\n)(?:[ ](?!\n))? # Capture the key if not a new line
(.*?)$ # Capture the value
""",
re.X | re.MULTILINE,
)
validate_tag_text: Pattern[str] = re.compile(r"[ \|,;:\*\(\)\[\]\\\.\n#&]")
validate_key_text: Pattern[str] = re.compile(r"[^-_\w\d\/\*\u263a-\U0001f645]")

View File

@@ -0,0 +1,302 @@
"""Obsidian vault representation."""
import re
import shutil
from pathlib import Path
import rich.repr
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.prompt import Confirm
from rich.table import Table
from obsidian_metadata._config import Config
from obsidian_metadata._utils import alerts
from obsidian_metadata._utils.alerts import logger as log
from obsidian_metadata.models import Note, VaultMetadata
@rich.repr.auto
class Vault:
"""Representation of the Obsidian vault.
Attributes:
vault (Path): Path to the vault.
dry_run (bool): Whether to perform a dry run.
backup_path (Path): Path to the backup of the vault.
new_vault (Path): Path to a new vault.
notes (list[Note]): List of all notes in the vault.
"""
def __init__(self, config: Config, dry_run: bool = False, path_filter: str = None):
self.vault_path: Path = config.vault_path
self.dry_run: bool = dry_run
self.backup_path: Path = self.vault_path.parent / f"{self.vault_path.name}.bak"
self.new_vault_path: Path = self.vault_path.parent / f"{self.vault_path.name}.new"
self.exclude_paths: list[Path] = []
self.metadata = VaultMetadata()
for p in config.exclude_paths:
self.exclude_paths.append(Path(self.vault_path / p))
self.path_filter = path_filter
self.note_paths = self._find_markdown_notes(path_filter)
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
transient=True,
) as progress:
progress.add_task(description="Processing notes...", total=None)
self.notes: list[Note] = [
Note(note_path=p, dry_run=self.dry_run) for p in self.note_paths
]
for _note in self.notes:
self.metadata.add_metadata(_note.frontmatter.dict)
self.metadata.add_metadata(_note.inline_metadata.dict)
self.metadata.add_metadata({_note.inline_tags.metadata_key: _note.inline_tags.list})
def __rich_repr__(self) -> rich.repr.Result:
"""Define rich representation of Vault."""
yield "vault_path", self.vault_path
yield "dry_run", self.dry_run
yield "backup_path", self.backup_path
yield "new_vault", self.new_vault_path
yield "num_notes", self.num_notes()
yield "exclude_paths", self.exclude_paths
def _find_markdown_notes(self, path_filter: str = None) -> list[Path]:
"""Build list of all markdown files in the vault.
Args:
path_filter (str, optional): Regex to filter notes by path.
Returns:
list[Path]: List of paths to all matching files in the vault.
"""
notes_list = [
p.resolve()
for p in self.vault_path.glob("**/*")
if p.suffix in [".md", ".MD", ".markdown", ".MARKDOWN"]
and not any(item in p.parents for item in self.exclude_paths)
]
if path_filter is not None:
notes_list = [
p for p in notes_list if re.search(path_filter, str(p.relative_to(self.vault_path)))
]
return notes_list
def backup(self) -> None:
"""Backup the vault."""
log.debug("Backing up vault")
if self.dry_run:
alerts.dryrun(f"Backup up vault to: {self.backup_path}")
return
try:
shutil.copytree(self.vault_path, self.backup_path)
except FileExistsError: # pragma: no cover
log.debug("Backup already exists")
if not Confirm.ask("Vault backup already exists. Overwrite?"):
alerts.info("Exiting backup not overwritten.")
return
log.debug("Overwriting backup")
shutil.rmtree(self.backup_path)
shutil.copytree(self.vault_path, self.backup_path)
alerts.success(f"Vault backed up to: {self.backup_path}")
def contains_inline_tag(self, tag: str, is_regex: bool = False) -> bool:
"""Check if vault contains the given inline tag.
Args:
tag (str): Tag to check for.
is_regex (bool, optional): Whether to use regex to match tag.
Returns:
bool: True if tag is found in vault.
"""
return any(_note.contains_inline_tag(tag) for _note in self.notes)
def contains_metadata(self, key: str, value: str = None, is_regex: bool = False) -> bool:
"""Check if vault contains the given metadata.
Args:
key (str): Key to check for. If value is None, will check vault for key.
value (str, optional): Value to check for.
is_regex (bool, optional): Whether to use regex to match key/value.
Returns:
bool: True if tag is found in vault.
"""
if value is None:
return self.metadata.contains(key, is_regex=is_regex)
return self.metadata.contains(key, value, is_regex=is_regex)
def delete_backup(self) -> None:
"""Delete the vault backup."""
log.debug("Deleting vault backup")
if self.backup_path.exists() and self.dry_run is False:
shutil.rmtree(self.backup_path)
alerts.success("Backup deleted")
elif self.backup_path.exists() and self.dry_run is True:
alerts.dryrun("Delete backup")
else:
alerts.info("No backup found")
def delete_inline_tag(self, tag: str) -> bool:
"""Delete an inline tag in the vault.
Args:
tag (str): Tag to delete.
Returns:
bool: True if tag was deleted.
"""
changes = False
for _note in self.notes:
if _note.delete_inline_tag(tag):
changes = True
if changes:
self.metadata.delete(self.notes[0].inline_tags.metadata_key, tag)
return True
return False
def delete_metadata(self, key: str, value: str = None) -> int:
"""Delete metadata in the vault.
Args:
key (str): Key to delete. Regex is supported
value (str, optional): Value to delete. Regex is supported
Returns:
int: Number of notes that had metadata deleted.
"""
num_changed = 0
for _note in self.notes:
if _note.delete_metadata(key, value):
num_changed += 1
if num_changed > 0:
self.metadata.delete(key, value)
return num_changed
return num_changed
def get_changed_notes(self) -> list[Note]:
"""Returns a list of notes that have changes.
Returns:
list[Note]: List of notes that have changes.
"""
changed_notes = []
for _note in self.notes:
if _note.has_changes():
changed_notes.append(_note)
changed_notes = sorted(changed_notes, key=lambda x: x.note_path)
return changed_notes
def info(self) -> None:
"""Print information about the vault."""
log.debug("Printing vault info")
table = Table(title="Vault Info", show_header=False)
table.add_row("Vault", str(self.vault_path))
table.add_row("Notes being edited", str(self.num_notes()))
table.add_row("Notes excluded from editing", str(self.num_excluded_notes()))
if self.backup_path.exists():
table.add_row("Backup path", str(self.backup_path))
else:
table.add_row("Backup", "None")
table.add_row("Active path filter", str(self.path_filter))
table.add_row("Notes with updates", str(len(self.get_changed_notes())))
Console().print(table)
def list_editable_notes(self) -> None:
"""Print a list of notes within the scope that are being edited."""
for _note in self.notes:
print(_note.note_path.relative_to(self.vault_path))
def num_excluded_notes(self) -> int:
"""Count number of excluded notes."""
excluded_notes = [
p.resolve()
for p in self.vault_path.glob("**/*")
if p.suffix in [".md", ".MD", ".markdown", ".MARKDOWN"] and p not in self.note_paths
]
return len(excluded_notes)
def num_notes(self) -> int:
"""Number of notes in the vault.
Returns:
int: Number of notes in the vault.
"""
return len(self.notes)
def rename_metadata(self, key: str, value_1: str, value_2: str = None) -> bool:
"""Renames a key or key-value pair in the note's metadata.
If no value is provided, will rename an entire key.
Args:
key (str): Key to rename.
value_1 (str): Value to rename or new name of key if no value_2 is provided.
value_2 (str, optional): New value.
Returns:
bool: True if metadata was renamed.
"""
changes = False
for _note in self.notes:
if _note.rename_metadata(key, value_1, value_2):
changes = True
if changes:
self.metadata.rename(key, value_1, value_2)
return True
return False
def rename_inline_tag(self, old_tag: str, new_tag: str) -> bool:
"""Rename an inline tag in the vault.
Args:
old_tag (str): Old tag name.
new_tag (str): New tag name.
Returns:
bool: True if tag was renamed.
"""
changes = False
for _note in self.notes:
if _note.rename_inline_tag(old_tag, new_tag):
changes = True
if changes:
self.metadata.rename(self.notes[0].inline_tags.metadata_key, old_tag, new_tag)
return True
return False
def write(self, new_vault: bool = False) -> None:
"""Write changes to the vault."""
log.debug("Writing changes to vault...")
if new_vault:
log.debug("Writing changes to backup")
for _note in self.notes:
_new_note_path: Path = Path(
self.new_vault_path / Path(_note.note_path).relative_to(self.vault_path)
)
log.debug(f"writing to {_new_note_path}")
_note.write(path=_new_note_path)
else:
for _note in self.notes:
log.debug(f"writing to {_note.note_path}")
_note.write()