feat: export metadata (#14)

* docs(readme): fix line breaks

* feat: export metadata to a CSV

* fix: finalize colors for questions

* feat: inspect frontmatter, inline, and tags separately

* feat: export metadata to JSON

* fix: do not count in-page links as tags

* ci(codecov): adjust patch target percentage down

* feat(metadata): export CSV or JSON from command line
This commit is contained in:
Nathaniel Landau
2023-02-02 17:09:31 -05:00
committed by GitHub
parent 4a29945de2
commit 8e040aeba4
20 changed files with 1668 additions and 1197 deletions

View File

@@ -2,7 +2,7 @@
from typing import Any
from pathlib import Path
import questionary
from rich import print
from rich import box
@@ -55,11 +55,7 @@ class Application:
case "review_changes":
self.review_changes()
case "commit_changes":
if self.commit_changes():
break
log.error("Commit failed. Please run with -vvv for more info.")
break
self.commit_changes()
case _:
break
@@ -221,13 +217,50 @@ class Application:
choices = [
{"name": "View all metadata", "value": "all_metadata"},
{"name": "View all frontmatter", "value": "all_frontmatter"},
{"name": "View all inline_metadata", "value": "all_inline"},
{"name": "View all keys", "value": "all_keys"},
{"name": "View all inline tags", "value": "all_tags"},
questionary.Separator(),
{"name": "Write all metadata to CSV", "value": "export_csv"},
{"name": "Write all metadata to JSON file", "value": "export_json"},
questionary.Separator(),
{"name": "Back", "value": "back"},
]
while True:
match self.questions.ask_selection(choices=choices, question="Select a vault action"):
case "all_metadata":
self.vault.metadata.print_metadata()
print("")
self.vault.metadata.print_metadata(area=MetadataType.ALL)
print("")
case "all_frontmatter":
print("")
self.vault.metadata.print_metadata(area=MetadataType.FRONTMATTER)
print("")
case "all_inline":
print("")
self.vault.metadata.print_metadata(area=MetadataType.INLINE)
print("")
case "all_keys":
print("")
self.vault.metadata.print_metadata(area=MetadataType.KEYS)
print("")
case "all_tags":
print("")
self.vault.metadata.print_metadata(area=MetadataType.TAGS)
print("")
case "export_csv":
path = self.questions.ask_path(question="Enter a path for the CSV file")
if path is None:
return
self.vault.export_metadata(path=path, format="csv")
alerts.success(f"Metadata written to {path}")
case "export_json":
path = self.questions.ask_path(question="Enter a path for the JSON file")
if path is None:
return
self.vault.export_metadata(path=path, format="json")
alerts.success(f"Metadata written to {path}")
case _:
return
@@ -316,12 +349,13 @@ class Application:
self.vault.backup()
if questionary.confirm(f"Commit {len(changed_notes)} changed files to disk?").ask():
self.vault.commit_changes()
self.vault.write()
if not self.dry_run:
alerts.success(f"{len(changed_notes)} changes committed to disk. Exiting")
return True
return False
return True
def delete_inline_tag(self) -> None:
"""Delete an inline tag."""
@@ -389,6 +423,18 @@ class Application:
)
self.questions = Questions(vault=self.vault)
def noninteractive_export_csv(self, path: Path) -> None:
"""Export the vault metadata to CSV."""
self._load_vault()
self.vault.export_metadata(format="json", path=str(path))
alerts.success(f"Exported metadata to {path}")
def noninteractive_export_json(self, path: Path) -> None:
"""Export the vault metadata to JSON."""
self._load_vault()
self.vault.export_metadata(format="json", path=str(path))
alerts.success(f"Exported metadata to {path}")
def rename_key(self) -> None:
"""Renames a key in the vault."""

View File

@@ -9,3 +9,5 @@ class MetadataType(Enum):
FRONTMATTER = "Frontmatter"
INLINE = "Inline Metadata"
TAGS = "Inline Tags"
KEYS = "Metadata Keys Only"
ALL = "All Metadata"

View File

@@ -13,12 +13,14 @@ from obsidian_metadata._utils import (
clean_dictionary,
dict_contains,
dict_values_to_lists_strings,
merge_dictionaries,
remove_markdown_sections,
)
from obsidian_metadata.models import Patterns # isort: ignore
from obsidian_metadata.models.enums import MetadataType
PATTERNS = Patterns()
INLINE_TAG_KEY: str = "Inline Tags"
INLINE_TAG_KEY: str = "inline_tag"
class VaultMetadata:
@@ -26,50 +28,83 @@ class VaultMetadata:
def __init__(self) -> None:
self.dict: dict[str, list[str]] = {}
self.frontmatter: dict[str, list[str]] = {}
self.inline_metadata: dict[str, list[str]] = {}
self.tags: list[str] = []
def __repr__(self) -> str:
"""Representation of all metadata."""
return str(self.dict)
def index_metadata(self, metadata: dict[str, list[str]]) -> None:
def index_metadata(
self, area: MetadataType, metadata: dict[str, list[str]] | list[str]
) -> None:
"""Index pre-existing metadata in the vault. Takes a dictionary as input and merges it with the existing metadata. Does not overwrite existing keys.
Args:
area (MetadataType): Type of metadata.
metadata (dict): Metadata to add.
"""
existing_metadata = self.dict
if isinstance(metadata, dict):
new_metadata = clean_dictionary(metadata)
self.dict = merge_dictionaries(self.dict.copy(), new_metadata.copy())
new_metadata = clean_dictionary(metadata)
if area == MetadataType.FRONTMATTER:
self.frontmatter = merge_dictionaries(self.frontmatter.copy(), new_metadata.copy())
for k, v in new_metadata.items():
if k in existing_metadata:
if isinstance(v, list):
existing_metadata[k].extend(v)
else:
existing_metadata[k] = v
if area == MetadataType.INLINE:
self.inline_metadata = merge_dictionaries(
self.inline_metadata.copy(), new_metadata.copy()
)
for k, v in existing_metadata.items():
if isinstance(v, list):
existing_metadata[k] = sorted(set(v))
elif isinstance(v, dict):
for kk, vv in v.items():
if isinstance(vv, list):
v[kk] = sorted(set(vv))
if area == MetadataType.TAGS and isinstance(metadata, list):
self.tags.extend(metadata)
self.tags = sorted({s.strip("#") for s in self.tags})
self.dict = dict(sorted(existing_metadata.items()))
def contains(self, key: str, value: str = None, is_regex: bool = False) -> bool:
def contains(
self, area: MetadataType, key: str = None, value: str = None, is_regex: bool = False
) -> bool:
"""Check if a key and/or a value exists in the metadata.
Args:
key (str): Key to check.
area (MetadataType): Type of metadata to check.
key (str, optional): Key to check.
value (str, optional): Value to check.
is_regex (bool, optional): Use regex to check. Defaults to False.
Returns:
bool: True if the key exists.
Raises:
ValueError: Key must be provided when checking for a key's existence.
ValueError: Value must be provided when checking for a tag's existence.
"""
return dict_contains(self.dict, key, value, is_regex)
if area != MetadataType.TAGS and key is None:
raise ValueError("Key must be provided when checking for a key's existence.")
match area: # noqa: E999
case MetadataType.ALL:
if dict_contains(self.dict, key, value, is_regex):
return True
if key is None and value is not None:
if is_regex:
return any(re.search(value, tag) for tag in self.tags)
return value in self.tags
case MetadataType.FRONTMATTER:
return dict_contains(self.frontmatter, key, value, is_regex)
case MetadataType.INLINE:
return dict_contains(self.inline_metadata, key, value, is_regex)
case MetadataType.KEYS:
return dict_contains(self.dict, key, value, is_regex)
case MetadataType.TAGS:
if value is None:
raise ValueError("Value must be provided when checking for a tag's existence.")
if is_regex:
return any(re.search(value, tag) for tag in self.tags)
return value in self.tags
return False
def delete(self, key: str, value_to_delete: str = None) -> bool:
"""Delete a key or a key's value from the metadata. Regex is supported to allow deleting more than one key or value.
@@ -99,37 +134,55 @@ class VaultMetadata:
return False
def print_keys(self) -> None:
"""Print all metadata keys."""
columns = Columns(
sorted(self.dict.keys()),
equal=True,
expand=True,
title="All metadata keys in Obsidian vault",
)
print(columns)
def print_metadata(self, area: MetadataType) -> None:
"""Print metadata to the terminal.
def print_metadata(self) -> None:
"""Print all metadata."""
table = Table(show_footer=False, show_lines=True)
table.add_column("Keys")
table.add_column("Values")
for key, value in sorted(self.dict.items()):
values: str | dict[str, list[str]] = (
"\n".join(sorted(value)) if isinstance(value, list) else value
Args:
area (MetadataType): Type of metadata to print
"""
dict_to_print: dict[str, list[str]] = None
list_to_print: list[str] = None
match area:
case MetadataType.INLINE:
dict_to_print = self.inline_metadata.copy()
header = "All inline metadata"
case MetadataType.FRONTMATTER:
dict_to_print = self.frontmatter.copy()
header = "All frontmatter"
case MetadataType.TAGS:
list_to_print = []
for tag in self.tags:
list_to_print.append(f"#{tag}")
header = "All inline tags"
case MetadataType.KEYS:
list_to_print = sorted(self.dict.keys())
header = "All Keys"
case MetadataType.ALL:
dict_to_print = self.dict.copy()
list_to_print = []
for tag in self.tags:
list_to_print.append(f"#{tag}")
header = "All metadata"
if dict_to_print is not None:
table = Table(title=header, show_footer=False, show_lines=True)
table.add_column("Keys")
table.add_column("Values")
for key, value in sorted(dict_to_print.items()):
values: str | dict[str, list[str]] = (
"\n".join(sorted(value)) if isinstance(value, list) else value
)
table.add_row(f"[bold]{key}[/]", str(values))
Console().print(table)
if list_to_print is not None:
columns = Columns(
sorted(list_to_print),
equal=True,
expand=True,
title=header if area != MetadataType.ALL else "All inline tags",
)
table.add_row(f"[bold]{key}[/]", str(values))
Console().print(table)
def print_tags(self) -> None:
"""Print all tags."""
columns = Columns(
sorted(self.dict["tags"]),
equal=True,
expand=True,
title="All tags in Obsidian vault",
)
print(columns)
print(columns)
def rename(self, key: str, value_1: str, value_2: str = None) -> bool:
"""Replace a value in the frontmatter.

View File

@@ -392,6 +392,9 @@ class Note:
typer.Exit: If the note's path is not found.
"""
p = self.note_path if path is None else path
if self.dry_run:
log.trace(f"DRY RUN: Writing note {p} to disk")
return
try:
with open(p, "w") as f:

View File

@@ -1,8 +1,9 @@
"""Regexes for parsing frontmatter and note content."""
import re
from dataclasses import dataclass
from typing import Pattern
import regex as re
from regex import Pattern
@dataclass
@@ -11,8 +12,9 @@ class Patterns:
find_inline_tags: Pattern[str] = re.compile(
r"""
(?:^|[ \|_,;:\*\(\)\[\]\\\.]) # Before tag is start of line or separator
\#([^ \|,;:\*\(\)\[\]\\\.\n#&]+) # Match tag until separator or end of line
(?:^|[ \|_,;:\*\)\[\]\\\.]|(?<!\])\() # Before tag is start of line or separator
(?<!\/\/[\w\d_\.\(\)\/&_-]+) # Before tag is not a link
\#([^ \|,;:\*\(\)\[\]\\\.\n#&]+) # Match tag until separator or end of line
""",
re.MULTILINE | re.X,
)

View File

@@ -18,6 +18,15 @@ from obsidian_metadata.models.vault import Vault
PATTERNS = Patterns()
# Reset the default style of the questionary prompts qmark
questionary.prompts.checkbox.DEFAULT_STYLE = questionary.Style([("qmark", "")])
questionary.prompts.common.DEFAULT_STYLE = questionary.Style([("qmark", "")])
questionary.prompts.confirm.DEFAULT_STYLE = questionary.Style([("qmark", "")])
questionary.prompts.confirm.DEFAULT_STYLE = questionary.Style([("qmark", "")])
questionary.prompts.path.DEFAULT_STYLE = questionary.Style([("qmark", "")])
questionary.prompts.select.DEFAULT_STYLE = questionary.Style([("qmark", "")])
questionary.prompts.text.DEFAULT_STYLE = questionary.Style([("qmark", "")])
class Questions:
"""Class for asking questions to the user and validating responses with questionary."""
@@ -64,13 +73,13 @@ class Questions:
"""
self.style = questionary.Style(
[
("qmark", "fg:#729fcf bold"),
("question", "fg:#729fcf bold"),
("qmark", "bold"),
("question", "bold"),
("separator", "fg:#808080"),
("instruction", "fg:#808080"),
("highlighted", "fg:#729fcf bold underline"),
("highlighted", "bold underline"),
("text", ""),
("pointer", "fg:#729fcf bold"),
("pointer", "bold"),
]
)
self.vault = vault
@@ -85,7 +94,7 @@ class Questions:
if len(text) < 1:
return "Tag cannot be empty"
if not self.vault.contains_inline_tag(text):
if not self.vault.metadata.contains(area=MetadataType.TAGS, value=text):
return f"'{text}' does not exist as a tag in the vault"
return True
@@ -99,7 +108,7 @@ class Questions:
if len(text) < 1:
return "Key cannot be empty"
if not self.vault.metadata.contains(text):
if not self.vault.metadata.contains(area=MetadataType.KEYS, key=text):
return f"'{text}' does not exist as a key in the vault"
return True
@@ -118,7 +127,7 @@ class Questions:
except re.error as error:
return f"Invalid regex: {error}"
if not self.vault.metadata.contains(text, is_regex=True):
if not self.vault.metadata.contains(area=MetadataType.KEYS, key=text, is_regex=True):
return f"'{text}' does not exist as a key in the vault"
return True
@@ -169,7 +178,9 @@ class Questions:
if len(text) < 1:
return "Value cannot be empty"
if self.key is not None and self.vault.metadata.contains(self.key, text):
if self.key is not None and self.vault.metadata.contains(
area=MetadataType.ALL, key=self.key, value=text
):
return f"{self.key}:{text} already exists"
return True
@@ -219,7 +230,9 @@ class Questions:
if len(text) == 0:
return True
if self.key is not None and not self.vault.metadata.contains(self.key, text):
if self.key is not None and not self.vault.metadata.contains(
area=MetadataType.ALL, key=self.key, value=text
):
return f"{self.key}:{text} does not exist"
return True
@@ -241,11 +254,42 @@ class Questions:
except re.error as error:
return f"Invalid regex: {error}"
if self.key is not None and not self.vault.metadata.contains(self.key, text, is_regex=True):
if self.key is not None and not self.vault.metadata.contains(
area=MetadataType.ALL, key=self.key, value=text, is_regex=True
):
return f"No values in {self.key} match regex: {text}"
return True
def ask_application_main(self) -> str: # pragma: no cover
"""Selectable list for the main application interface.
Args:
style (questionary.Style): The style to use for the question.
Returns:
str: The selected application.
"""
return questionary.select(
"What do you want to do?",
choices=[
{"name": "Vault Actions", "value": "vault_actions"},
{"name": "Inspect Metadata", "value": "inspect_metadata"},
{"name": "Filter Notes in Scope", "value": "filter_notes"},
{"name": "Add Metadata", "value": "add_metadata"},
{"name": "Rename Metadata", "value": "rename_metadata"},
{"name": "Delete Metadata", "value": "delete_metadata"},
questionary.Separator("-------------------------------"),
{"name": "Review Changes", "value": "review_changes"},
{"name": "Commit Changes", "value": "commit_changes"},
questionary.Separator("-------------------------------"),
{"name": "Quit", "value": "abort"},
],
use_shortcuts=False,
style=self.style,
qmark="INPUT |",
).ask()
def ask_area(self) -> MetadataType | str: # pragma: no cover
"""Ask the user for the metadata area to work on.
@@ -361,35 +405,6 @@ class Questions:
qmark="INPUT |",
).ask()
def ask_application_main(self) -> str: # pragma: no cover
"""Selectable list for the main application interface.
Args:
style (questionary.Style): The style to use for the question.
Returns:
str: The selected application.
"""
return questionary.select(
"What do you want to do?",
choices=[
{"name": "Vault Actions", "value": "vault_actions"},
{"name": "Inspect Metadata", "value": "inspect_metadata"},
{"name": "Filter Notes in Scope", "value": "filter_notes"},
{"name": "Add Metadata", "value": "add_metadata"},
{"name": "Rename Metadata", "value": "rename_metadata"},
{"name": "Delete Metadata", "value": "delete_metadata"},
questionary.Separator("-------------------------------"),
{"name": "Review Changes", "value": "review_changes"},
{"name": "Commit Changes", "value": "commit_changes"},
questionary.Separator("-------------------------------"),
{"name": "Quit", "value": "abort"},
],
use_shortcuts=False,
style=self.style,
qmark="INPUT |",
).ask()
def ask_new_key(self, question: str = "New key name") -> str: # pragma: no cover
"""Ask the user for a new metadata key.
@@ -422,7 +437,7 @@ class Questions:
question, validate=self._validate_new_value, style=self.style, qmark="INPUT |"
).ask()
def ask_number(self, question: str = "Enter a number") -> int:
def ask_number(self, question: str = "Enter a number") -> int: # pragma: no cover
"""Ask the user for a number.
Args:
@@ -435,6 +450,17 @@ class Questions:
question, validate=self._validate_number, style=self.style, qmark="INPUT |"
).ask()
def ask_path(self, question: str = "Enter a path") -> str: # pragma: no cover
"""Ask the user for a path.
Args:
question (str, optional): The question to ask. Defaults to "Enter a path".
Returns:
str: A path.
"""
return questionary.path(question, style=self.style, qmark="INPUT |").ask()
def ask_selection(
self, choices: list[Any], question: str = "Select an option"
) -> Any: # pragma: no cover

View File

@@ -1,10 +1,11 @@
"""Obsidian vault representation."""
import csv
import re
import shutil
from dataclasses import dataclass
from pathlib import Path
import json
import rich.repr
from rich import box
from rich.console import Console
@@ -46,6 +47,7 @@ class Vault:
filters: list[VaultFilter] = [],
):
self.vault_path: Path = config.path
self.name = self.vault_path.name
self.dry_run: bool = dry_run
self.backup_path: Path = self.vault_path.parent / f"{self.vault_path.name}.bak"
self.exclude_paths: list[Path] = []
@@ -132,10 +134,15 @@ class Vault:
) as progress:
progress.add_task(description="Processing notes...", total=None)
for _note in self.notes_in_scope:
self.metadata.index_metadata(_note.frontmatter.dict)
self.metadata.index_metadata(_note.inline_metadata.dict)
self.metadata.index_metadata(
{_note.inline_tags.metadata_key: _note.inline_tags.list}
area=MetadataType.FRONTMATTER, metadata=_note.frontmatter.dict
)
self.metadata.index_metadata(
area=MetadataType.INLINE, metadata=_note.inline_metadata.dict
)
self.metadata.index_metadata(
area=MetadataType.TAGS,
metadata=_note.inline_tags.list,
)
def add_metadata(self, area: MetadataType, key: str, value: str | list[str] = None) -> int:
@@ -183,33 +190,21 @@ class Vault:
alerts.success(f"Vault backed up to: {self.backup_path}")
def contains_inline_tag(self, tag: str, is_regex: bool = False) -> bool:
"""Check if vault contains the given inline tag.
def commit_changes(self) -> None:
"""Commit changes by writing to disk."""
log.debug("Writing changes to vault...")
if self.dry_run:
for _note in self.notes_in_scope:
if _note.has_changes():
alerts.dryrun(
f"writing changes to {_note.note_path.relative_to(self.vault_path)}"
)
return
Args:
tag (str): Tag to check for.
is_regex (bool, optional): Whether to use regex to match tag.
Returns:
bool: True if tag is found in vault.
"""
return any(_note.contains_inline_tag(tag) for _note in self.notes_in_scope)
def contains_metadata(self, key: str, value: str = None, is_regex: bool = False) -> bool:
"""Check if vault contains the given metadata.
Args:
key (str): Key to check for. If value is None, will check vault for key.
value (str, optional): Value to check for.
is_regex (bool, optional): Whether to use regex to match key/value.
Returns:
bool: True if tag is found in vault.
"""
if value is None:
return self.metadata.contains(key, is_regex=is_regex)
return self.metadata.contains(key, value, is_regex=is_regex)
for _note in self.notes_in_scope:
if _note.has_changes():
log.trace(f"writing to {_note.note_path}")
_note.write()
def delete_backup(self) -> None:
"""Delete the vault backup."""
@@ -348,10 +343,44 @@ class Vault:
return num_changed
def write(self) -> None:
"""Write changes to the vault."""
log.debug("Writing changes to vault...")
if self.dry_run is False:
for _note in self.notes_in_scope:
log.trace(f"writing to {_note.note_path}")
_note.write()
def export_metadata(self, path: str, format: str = "csv") -> None:
"""Write metadata to a csv file.
Args:
path (Path): Path to write csv file to.
export_as (str, optional): Export as 'csv' or 'json'. Defaults to "csv".
"""
export_file = Path(path).expanduser().resolve()
match format: # noqa: E999
case "csv":
with open(export_file, "w", encoding="UTF8") as f:
writer = csv.writer(f)
writer.writerow(["Metadata Type", "Key", "Value"])
for key, value in self.metadata.frontmatter.items():
if isinstance(value, list):
if len(value) > 0:
for v in value:
writer.writerow(["frontmatter", key, v])
else:
writer.writerow(["frontmatter", key, v])
for key, value in self.metadata.inline_metadata.items():
if isinstance(value, list):
if len(value) > 0:
for v in value:
writer.writerow(["inline_metadata", key, v])
else:
writer.writerow(["frontmatter", key, v])
for tag in self.metadata.tags:
writer.writerow(["tags", "", f"{tag}"])
case "json":
dict_to_dump = {
"frontmatter": self.metadata.dict,
"inline_metadata": self.metadata.inline_metadata,
"tags": self.metadata.tags,
}
with open(export_file, "w", encoding="UTF8") as f:
json.dump(dict_to_dump, f, indent=4, ensure_ascii=False, sort_keys=True)