feat(app): limit scope of notes with one or more filters (#13)

* style: rename `VaultMetadata.add_metadata` to `VaultMetadata.index_metadata`

* refactor(vault): refactor filtering notes

* fix(application): improve usage display

* fix(application): improve colors of questions

* feat(application): limit the scope of notes to be processed with one or more filters

* build(deps): update identify
This commit is contained in:
Nathaniel Landau
2023-02-01 15:00:57 -05:00
committed by GitHub
parent 6909738218
commit 4a29945de2
15 changed files with 418 additions and 171 deletions

View File

@@ -2,6 +2,7 @@
import re
import shutil
from dataclasses import dataclass
from pathlib import Path
import rich.repr
@@ -17,6 +18,16 @@ from obsidian_metadata._utils.alerts import logger as log
from obsidian_metadata.models import MetadataType, Note, VaultMetadata
@dataclass
class VaultFilter:
"""Vault filters."""
path_filter: str = None
key_filter: str = None
value_filter: str = None
tag_filter: str = None
@rich.repr.auto
class Vault:
"""Representation of the Obsidian vault.
@@ -28,7 +39,12 @@ class Vault:
notes (list[Note]): List of all notes in the vault.
"""
def __init__(self, config: VaultConfig, dry_run: bool = False, path_filter: str = None):
def __init__(
self,
config: VaultConfig,
dry_run: bool = False,
filters: list[VaultFilter] = [],
):
self.vault_path: Path = config.path
self.dry_run: bool = dry_run
self.backup_path: Path = self.vault_path.parent / f"{self.vault_path.name}.bak"
@@ -37,8 +53,8 @@ class Vault:
for p in config.exclude_paths:
self.exclude_paths.append(Path(self.vault_path / p))
self.path_filter = path_filter
self.note_paths = self._find_markdown_notes(path_filter)
self.filters = filters
self.all_note_paths = self._find_markdown_notes()
with Progress(
SpinnerColumn(),
@@ -46,9 +62,10 @@ class Vault:
transient=True,
) as progress:
progress.add_task(description="Processing notes...", total=None)
self.notes: list[Note] = [
Note(note_path=p, dry_run=self.dry_run) for p in self.note_paths
self.all_notes: list[Note] = [
Note(note_path=p, dry_run=self.dry_run) for p in self.all_note_paths
]
self.notes_in_scope = self._filter_notes()
self._rebuild_vault_metadata()
@@ -57,33 +74,54 @@ class Vault:
yield "vault_path", self.vault_path
yield "dry_run", self.dry_run
yield "backup_path", self.backup_path
yield "num_notes", self.num_notes()
yield "num_notes", len(self.all_notes)
yield "num_notes_in_scope", len(self.notes_in_scope)
yield "exclude_paths", self.exclude_paths
def _find_markdown_notes(self, path_filter: str = None) -> list[Path]:
"""Build list of all markdown files in the vault.
def _filter_notes(self) -> list[Note]:
"""Filter notes by path and metadata using the filters defined in self.filters.
Args:
path_filter (str, optional): Regex to filter notes by path.
Returns:
list[Note]: List of notes matching the filters.
"""
notes_list = self.all_notes.copy()
for _filter in self.filters:
if _filter.path_filter is not None:
notes_list = [
n
for n in notes_list
if re.search(_filter.path_filter, str(n.note_path.relative_to(self.vault_path)))
]
if _filter.tag_filter is not None:
notes_list = [n for n in notes_list if n.contains_inline_tag(_filter.tag_filter)]
if _filter.key_filter is not None and _filter.value_filter is not None:
notes_list = [
n
for n in notes_list
if n.contains_metadata(_filter.key_filter, _filter.value_filter)
]
if _filter.key_filter is not None and _filter.value_filter is None:
notes_list = [n for n in notes_list if n.contains_metadata(_filter.key_filter)]
return notes_list
def _find_markdown_notes(self) -> list[Path]:
"""Build list of all markdown files in the vault.
Returns:
list[Path]: List of paths to all matching files in the vault.
"""
notes_list = [
return [
p.resolve()
for p in self.vault_path.glob("**/*")
if p.suffix in [".md", ".MD", ".markdown", ".MARKDOWN"]
and not any(item in p.parents for item in self.exclude_paths)
]
if path_filter is not None:
notes_list = [
p for p in notes_list if re.search(path_filter, str(p.relative_to(self.vault_path)))
]
return notes_list
def _rebuild_vault_metadata(self) -> None:
"""Rebuild vault metadata."""
self.metadata = VaultMetadata()
@@ -93,10 +131,12 @@ class Vault:
transient=True,
) as progress:
progress.add_task(description="Processing notes...", total=None)
for _note in self.notes:
self.metadata.add_metadata(_note.frontmatter.dict)
self.metadata.add_metadata(_note.inline_metadata.dict)
self.metadata.add_metadata({_note.inline_tags.metadata_key: _note.inline_tags.list})
for _note in self.notes_in_scope:
self.metadata.index_metadata(_note.frontmatter.dict)
self.metadata.index_metadata(_note.inline_metadata.dict)
self.metadata.index_metadata(
{_note.inline_tags.metadata_key: _note.inline_tags.list}
)
def add_metadata(self, area: MetadataType, key: str, value: str | list[str] = None) -> int:
"""Add metadata to all notes in the vault.
@@ -111,7 +151,7 @@ class Vault:
"""
num_changed = 0
for _note in self.notes:
for _note in self.notes_in_scope:
if _note.add_metadata(area, key, value):
num_changed += 1
@@ -153,7 +193,7 @@ class Vault:
Returns:
bool: True if tag is found in vault.
"""
return any(_note.contains_inline_tag(tag) for _note in self.notes)
return any(_note.contains_inline_tag(tag) for _note in self.notes_in_scope)
def contains_metadata(self, key: str, value: str = None, is_regex: bool = False) -> bool:
"""Check if vault contains the given metadata.
@@ -193,7 +233,7 @@ class Vault:
"""
num_changed = 0
for _note in self.notes:
for _note in self.notes_in_scope:
if _note.delete_inline_tag(tag):
num_changed += 1
@@ -214,7 +254,7 @@ class Vault:
"""
num_changed = 0
for _note in self.notes:
for _note in self.notes_in_scope:
if _note.delete_metadata(key, value):
num_changed += 1
@@ -230,7 +270,7 @@ class Vault:
list[Note]: List of notes that have changes.
"""
changed_notes = []
for _note in self.notes:
for _note in self.notes_in_scope:
if _note.has_changes():
changed_notes.append(_note)
@@ -245,36 +285,23 @@ class Vault:
table.add_row("Backup path", str(self.backup_path))
else:
table.add_row("Backup", "None")
table.add_row("Notes in scope", str(self.num_notes()))
table.add_row("Notes in scope", str(len(self.notes_in_scope)))
table.add_row("Notes excluded from scope", str(self.num_excluded_notes()))
table.add_row("Active path filter", str(self.path_filter))
table.add_row("Notes with updates", str(len(self.get_changed_notes())))
table.add_row("Active filters", str(len(self.filters)))
table.add_row("Notes with changes", str(len(self.get_changed_notes())))
Console().print(table)
def list_editable_notes(self) -> None:
"""Print a list of notes within the scope that are being edited."""
table = Table(title="Notes in current scope", show_header=False, box=box.HORIZONTALS)
for _n, _note in enumerate(self.notes, start=1):
for _n, _note in enumerate(self.notes_in_scope, start=1):
table.add_row(str(_n), str(_note.note_path.relative_to(self.vault_path)))
Console().print(table)
def num_excluded_notes(self) -> int:
"""Count number of excluded notes."""
excluded_notes = [
p.resolve()
for p in self.vault_path.glob("**/*")
if p.suffix in [".md", ".MD", ".markdown", ".MARKDOWN"] and p not in self.note_paths
]
return len(excluded_notes)
def num_notes(self) -> int:
"""Number of notes in the vault.
Returns:
int: Number of notes in the vault.
"""
return len(self.notes)
return len(self.all_notes) - len(self.notes_in_scope)
def rename_metadata(self, key: str, value_1: str, value_2: str = None) -> int:
"""Renames a key or key-value pair in the note's metadata.
@@ -291,7 +318,7 @@ class Vault:
"""
num_changed = 0
for _note in self.notes:
for _note in self.notes_in_scope:
if _note.rename_metadata(key, value_1, value_2):
num_changed += 1
@@ -312,7 +339,7 @@ class Vault:
"""
num_changed = 0
for _note in self.notes:
for _note in self.notes_in_scope:
if _note.rename_inline_tag(old_tag, new_tag):
num_changed += 1
@@ -325,6 +352,6 @@ class Vault:
"""Write changes to the vault."""
log.debug("Writing changes to vault...")
if self.dry_run is False:
for _note in self.notes:
for _note in self.notes_in_scope:
log.trace(f"writing to {_note.note_path}")
_note.write()