refactor(application): refactor questions to separate class (#7)

* refactor(application): refactor questions to separate class
* test(application): add tests for`Application` class
This commit is contained in:
Nathaniel Landau
2023-01-25 12:20:59 -05:00
committed by GitHub
parent 1e4fbcb4e2
commit 455a2c9e86
11 changed files with 1169 additions and 304 deletions

View File

@@ -7,9 +7,10 @@ import questionary
from rich import print
from obsidian_metadata._config import VaultConfig
from obsidian_metadata._utils import alerts
from obsidian_metadata._utils.alerts import logger as log
from obsidian_metadata.models import Patterns, Vault
from obsidian_metadata._utils import alerts
from obsidian_metadata.models.questions import Questions
PATTERNS = Patterns()
@@ -25,14 +26,7 @@ class Application:
def __init__(self, config: VaultConfig, dry_run: bool) -> None:
self.config = config
self.dry_run = dry_run
self.custom_style = questionary.Style(
[
("separator", "bold fg:#6C6C6C"),
("instruction", "fg:#6C6C6C"),
("highlighted", "bold reverse"),
("pointer", "bold"),
]
)
self.questions = Questions()
def load_vault(self, path_filter: str = None) -> None:
"""Load the vault.
@@ -42,98 +36,52 @@ class Application:
"""
self.vault: Vault = Vault(config=self.config, dry_run=self.dry_run, path_filter=path_filter)
log.info(f"Indexed {self.vault.num_notes()} notes from {self.vault.vault_path}")
self.questions = Questions(vault=self.vault)
def main_app(self) -> None: # noqa: C901
def main_app(self) -> None:
"""Questions for the main application."""
self.load_vault()
while True:
print("\n")
self.vault.info()
operation = questionary.select(
"What do you want to do?",
choices=[
questionary.Separator("\n-- VAULT ACTIONS -----------------"),
{"name": "Backup vault", "value": "backup_vault"},
{"name": "Delete vault backup", "value": "delete_backup"},
{"name": "View all metadata", "value": "all_metadata"},
{"name": "List notes in scope", "value": "list_notes"},
{
"name": "Filter the notes being processed by their path",
"value": "filter_notes",
},
questionary.Separator("\n-- INLINE TAG ACTIONS ---------"),
questionary.Separator("Tags in the note body"),
{
"name": "Rename an inline tag",
"value": "rename_inline_tag",
},
{
"name": "Delete an inline tag",
"value": "delete_inline_tag",
},
questionary.Separator("\n-- METADATA ACTIONS -----------"),
questionary.Separator("Frontmatter or inline metadata"),
{"name": "Rename Key", "value": "rename_key"},
{"name": "Delete Key", "value": "delete_key"},
{"name": "Rename Value", "value": "rename_value"},
{"name": "Delete Value", "value": "delete_value"},
questionary.Separator("\n-- REVIEW/COMMIT CHANGES ------"),
{"name": "Review changes", "value": "review_changes"},
{"name": "Commit changes", "value": "commit_changes"},
questionary.Separator("-------------------------------"),
{"name": "Quit", "value": "abort"},
],
use_shortcuts=False,
style=self.custom_style,
).ask()
if operation == "filter_notes":
path_filter = questionary.text(
"Enter a regex to filter notes by path",
validate=lambda text: len(text) > 0,
).ask()
if path_filter is None:
continue
self.load_vault(path_filter=path_filter)
match self.questions.ask_main_application(): # noqa: E999
case None:
break
case "filter_notes":
self.load_vault(path_filter=self.questions.ask_for_filter_path())
case "all_metadata":
self.vault.metadata.print_metadata()
case "backup_vault":
self.vault.backup()
case "delete_backup":
self.vault.delete_backup()
case "list_notes":
self.vault.list_editable_notes()
case "rename_inline_tag":
self.rename_inline_tag()
case "delete_inline_tag":
self.delete_inline_tag()
case "rename_key":
self.rename_key()
case "delete_key":
self.delete_key()
case "rename_value":
self.rename_value()
case "delete_value":
self.delete_value()
case "review_changes":
self.review_changes()
case "commit_changes":
if self.commit_changes():
break
if operation == "all_metadata":
self.vault.metadata.print_metadata()
log.error("Commit failed. Please run with -vvv for more info.")
break
if operation == "backup_vault":
self.vault.backup()
if operation == "delete_backup":
self.vault.delete_backup()
if operation == "list_notes":
self.vault.list_editable_notes()
if operation == "rename_inline_tag":
self.rename_inline_tag()
if operation == "delete_inline_tag":
self.delete_inline_tag()
if operation == "rename_key":
self.rename_key()
if operation == "delete_key":
self.delete_key()
if operation == "rename_value":
self.rename_value()
if operation == "delete_value":
self.delete_value()
if operation == "review_changes":
self.review_changes()
if operation == "commit_changes" and self.commit_changes():
break
if operation == "abort":
break
case "abort":
break
print("Done!")
return
@@ -141,170 +89,126 @@ class Application:
def rename_key(self) -> None:
"""Renames a key in the vault."""
def validate_key(text: str) -> bool:
"""Validate the key name."""
if self.vault.metadata.contains(text):
return True
return False
def validate_new_key(text: str) -> bool:
"""Validate the tag name."""
if PATTERNS.validate_key_text.search(text) is not None:
return False
if len(text) == 0:
return False
return True
original_key = questionary.text(
"Which key would you like to rename?",
validate=validate_key,
).ask()
original_key = self.questions.ask_for_existing_key(
question="Which key would you like to rename?"
)
if original_key is None:
return
new_key = questionary.text(
"New key name",
validate=validate_new_key,
).ask()
new_key = self.questions.ask_for_new_key()
if new_key is None:
return
self.vault.rename_metadata(original_key, new_key)
num_changed = self.vault.rename_metadata(original_key, new_key)
if num_changed == 0:
alerts.warning(f"No notes were changed")
return
alerts.success(
f"Renamed [reverse]{original_key}[/] to [reverse]{new_key}[/] in {num_changed} notes"
)
def rename_inline_tag(self) -> None:
"""Rename an inline tag."""
def validate_new_tag(text: str) -> bool:
"""Validate the tag name."""
if PATTERNS.validate_tag_text.search(text) is not None:
return False
if len(text) == 0:
return False
return True
original_tag = questionary.text(
"Which tag would you like to rename?",
validate=lambda text: True
if self.vault.contains_inline_tag(text)
else "Tag not found in vault",
).ask()
original_tag = self.questions.ask_for_existing_inline_tag(question="Which tag to rename?")
if original_tag is None:
return
new_tag = questionary.text(
"New tag name",
validate=validate_new_tag,
).ask()
new_tag = self.questions.ask_for_new_tag("New tag")
if new_tag is None:
return
self.vault.rename_inline_tag(original_tag, new_tag)
alerts.success(f"Renamed [reverse]{original_tag}[/] to [reverse]{new_tag}[/]")
num_changed = self.vault.rename_inline_tag(original_tag, new_tag)
if num_changed == 0:
alerts.warning(f"No notes were changed")
return
alerts.success(
f"Renamed [reverse]{original_tag}[/] to [reverse]{new_tag}[/] in {num_changed} notes"
)
return
def delete_inline_tag(self) -> None:
"""Delete an inline tag."""
tag = questionary.text(
"Which tag would you like to delete?",
validate=lambda text: True
if self.vault.contains_inline_tag(text)
else "Tag not found in vault",
).ask()
if tag is None:
tag = self.questions.ask_for_existing_inline_tag(
question="Which tag would you like to delete?"
)
num_changed = self.vault.delete_inline_tag(tag)
if num_changed == 0:
alerts.warning(f"No notes were changed")
return
self.vault.delete_inline_tag(tag)
alerts.success(f"Deleted inline tag: {tag}")
alerts.success(f"Deleted inline tag: {tag} in {num_changed} notes")
return
def delete_key(self) -> None:
"""Delete a key from the vault."""
while True:
key_to_delete = questionary.text("Regex for the key(s) you'd like to delete?").ask()
if key_to_delete is None:
return
key_to_delete = self.questions.ask_for_existing_keys_regex(
question="Regex for the key(s) you'd like to delete?"
)
if key_to_delete is None:
return
if not self.vault.metadata.contains(key_to_delete, is_regex=True):
alerts.warning(f"No matching keys in the vault: {key_to_delete}")
continue
num_changed = self.vault.delete_metadata(key_to_delete)
if num_changed == 0:
alerts.warning(f"No notes found with a key matching: [reverse]{key_to_delete}[/]")
return
num_changed = self.vault.delete_metadata(key_to_delete)
if num_changed == 0:
alerts.warning(f"No notes found matching: [reverse]{key_to_delete}[/]")
return
alerts.success(
f"Deleted keys matching: [reverse]{key_to_delete}[/] from {num_changed} notes"
)
break
alerts.success(
f"Deleted keys matching: [reverse]{key_to_delete}[/] from {num_changed} notes"
)
return
def rename_value(self) -> None:
"""Rename a value in the vault."""
key = questionary.text(
"Which key contains the value to rename?",
validate=lambda text: True
if self.vault.metadata.contains(text)
else "Key not found in vault",
).ask()
key = self.questions.ask_for_existing_key(
question="Which key contains the value to rename?"
)
if key is None:
return
value = questionary.text(
"Which value would you like to rename?",
validate=lambda text: True
if self.vault.metadata.contains(key, text)
else f"Value not found in {key}",
).ask()
question_key = Questions(vault=self.vault, key=key)
value = question_key.ask_for_existing_value(
question="Which value would you like to rename?"
)
if value is None:
return
new_value = questionary.text(
"New value?",
validate=lambda text: True
if not self.vault.metadata.contains(key, text)
else f"Value already exists in {key}",
).ask()
new_value = question_key.ask_for_new_value()
if new_value is None:
return
if self.vault.rename_metadata(key, value, new_value):
alerts.success(f"Renamed [reverse]{key}: {value}[/] to [reverse]{key}: {new_value}[/]")
num_changes = self.vault.rename_metadata(key, value, new_value)
if num_changes == 0:
alerts.warning(f"No notes were changed")
return
alerts.success(f"Renamed '{key}:{value}' to '{key}:{new_value}' in {num_changes} notes")
def delete_value(self) -> None:
"""Delete a value from the vault."""
while True:
key = questionary.text(
"Which key contains the value to delete?",
).ask()
if key is None:
return
if not self.vault.metadata.contains(key, is_regex=True):
alerts.warning(f"No keys in value match: {key}")
continue
break
key = self.questions.ask_for_existing_key(
question="Which key contains the value to delete?"
)
if key is None:
return
while True:
value = questionary.text(
"Regex for the value to delete",
).ask()
if value is None:
return
if not self.vault.metadata.contains(key, value, is_regex=True):
alerts.warning(f"No matching key value pairs found in the vault: {key}: {value}")
continue
questions2 = Questions(vault=self.vault, key=key)
value = questions2.ask_for_existing_value_regex(question="Regex for the value to delete")
if value is None:
return
num_changed = self.vault.delete_metadata(key, value)
if num_changed == 0:
alerts.warning(f"No notes found matching: [reverse]{key}: {value}[/]")
return
num_changed = self.vault.delete_metadata(key, value)
if num_changed == 0:
alerts.warning(f"No notes found matching: {key}: {value}")
return
alerts.success(
f"Deleted {num_changed} entries matching: [reverse]{key}[/]: [reverse]{value}[/]"
)
break
alerts.success(
f"Deleted value [reverse]{value}[/] from key [reverse]{key}[/] in {num_changed} notes"
)
return
@@ -317,7 +221,9 @@ class Application:
return
print(f"\nFound {len(changed_notes)} changed notes in the vault.\n")
answer = questionary.confirm("View diffs of individual files?", default=False).ask()
answer = self.questions.ask_confirm(
question="View diffs of individual files?", default=False
)
if not answer:
return
@@ -330,16 +236,14 @@ class Application:
choices.append(_selection)
choices.append(questionary.Separator())
choices.append({"name": "Return", "value": "skip"})
choices.append({"name": "Return", "value": "return"})
while True:
note_to_review = questionary.select(
"Select a new to view the diff.",
note_to_review = self.questions.ask_for_selection(
choices=choices,
use_shortcuts=False,
style=self.custom_style,
).ask()
if note_to_review is None or note_to_review == "skip":
question="Select a new to view the diff",
)
if note_to_review is None or note_to_review == "return":
break
changed_notes[note_to_review].print_diff()