Source code for bibmanager.bib_manager.browser

# Copyright (c) 2018-2024 Patricio Cubillos.
# bibmanager is open-source software under the MIT license (see LICENSE).

__all__ = [
    'browse',
]

from asyncio import Future, ensure_future
from contextlib import redirect_stdout
import io
import itertools
import os
import re
import textwrap
import webbrowser


from prompt_toolkit import print_formatted_text, search
from prompt_toolkit.application import Application
from prompt_toolkit.application.current import get_app
from prompt_toolkit.buffer import Buffer
from prompt_toolkit.completion import PathCompleter, WordCompleter
from prompt_toolkit.filters import Condition
from prompt_toolkit.formatted_text import PygmentsTokens, FormattedText
from prompt_toolkit.formatted_text.utils import fragment_list_to_text
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.key_binding.key_bindings import merge_key_bindings
from prompt_toolkit.key_binding.bindings.auto_suggest import \
    load_auto_suggest_bindings
from prompt_toolkit.key_binding.bindings.scroll import (
    scroll_half_page_down,
    scroll_half_page_up,
)
from prompt_toolkit.layout.containers import (
    Float, FloatContainer,
    HSplit, VSplit,
    Window, WindowAlign, ConditionalContainer,
)
from prompt_toolkit.layout.controls import FormattedTextControl, BufferControl
from prompt_toolkit.layout.dimension import D
from prompt_toolkit.layout.layout import Layout
from prompt_toolkit.layout.menus import CompletionsMenu
from prompt_toolkit.layout.processors import (
    Transformation,
    Processor,
    AppendAutoSuggestion,
)
from prompt_toolkit.layout.utils import explode_text_fragments
from prompt_toolkit.lexers import PygmentsLexer
from prompt_toolkit.selection import PasteMode
from prompt_toolkit.styles import Style, style_from_pygments_cls, merge_styles
from prompt_toolkit.widgets import (
    Button, Dialog, Label, SearchToolbar, TextArea,)
import pygments
from pygments.lexers.bibtex import BibTeXLexer


from . import bib_manager as bm
from .. import config_manager as cm
from .. import pdf_manager as pm
from .. import utils as u
from ..__init__ import __version__ as ver


help_message = f"""\
h       Show this message
enter   Select/unselect entry for saving
s       Save selected entries to file or screen output
f,/,?   Find literal string in the main text
e       Expand/collapse content of current entry
E       Expand/collapse all entries
o       Open PDF of entry (ask to fetch if needed)
b       Open entry in ADS through the web browser
k       Search entries by keywords (authors/years/tags/etc),
          displaying only matching entries
K       Clear keyword-search selection
q       Quit

Navigation
Arrow keys  Move up, down, left, and right
g/G         Go to first/last line
u/d         Scroll up/down
n/N         Go to next/previous search occurrence

This is bibmanager version {ver}
Created by Patricio Cubillos."""

class TextInputDialog:
    """Hello, this is doc"""
    def __init__(self, title="", label_text="", completer=None):
        self.future = Future()

        def accept_text(buf):
            buf.complete_state = None
            self.future.set_result(self.text_area.text)
            if self.text_area.text == "":
                get_app().exit()

        self.text_area = TextArea(
            completer=completer,
            multiline=False,
            width=D(preferred=40),
            accept_handler=accept_text,
        )

        self.dialog = Dialog(
            title=title,
            body=HSplit([
                Label(text=label_text), self.text_area, Label(text="")]),
            width=D(preferred=75),
            modal=True,
        )

    def __pt_container__(self):
        return self.dialog


class MessageDialog:
    def __init__(self, title, text, asking=False):
        self.future = Future()

        def set_done():
            self.future.set_result(None)
        def accept():
            self.future.set_result(True)
        def cancel():
            self.future.set_result(False)

        if asking:
            buttons = [
                Button(text="Yes", handler=accept),
                Button(text="No", handler=cancel),
            ]
        else:
            buttons = [Button(text="OK", handler=set_done)]

        text = "\n".join([
            textwrap.fill(line, width=71)
            for line in text.splitlines()
            ])
        self.dialog = Dialog(
            title=title,
            body=HSplit([Label(text=text)]),
            buttons=buttons,
            width=D(preferred=75),
            modal=True,
            )

    def __pt_container__(self):
        return self.dialog


def show_message(title, text):
    async def coroutine():
        dialog = MessageDialog(title, text)
        await show_dialog_as_float(dialog)

    ensure_future(coroutine())


async def show_dialog_as_float(dialog):
    """Coroutine."""
    app = get_app()
    root_container = app.layout.container
    float_ = Float(content=dialog)
    root_container.floats.insert(0, float_)

    focused_before = app.layout.current_window
    app.layout.focus(dialog)
    app.layout.current_window.dialog = dialog
    result = await dialog.future
    if hasattr(app.layout.current_window, 'dialog'):
        del(app.layout.current_window.dialog)
    app.layout.focus(focused_before)

    if float_ in root_container.floats:
        root_container.floats.remove(float_)

    return result


class HighlightEntryProcessor(Processor):
    """Processor to highlight a list of texts in the document."""
    match_fragment = " class:search "
    selected_entries = []

    def toggle_selected_entry(self, entry_key):
        """Select/deselect entry_key from the list of highlighted texts."""
        if entry_key in self.selected_entries:
            self.selected_entries.remove(entry_key)
        else:
            self.selected_entries.append(entry_key)

    def apply_transformation(self, transformation_input):
        (
            buffer_control,
            document,
            lineno,
            source_to_display,
            fragments,
            _,
            _,
        ) = transformation_input.unpack()

        if self.selected_entries and not get_app().is_done:
            # For each search match, replace the style string.
            line_text = fragment_list_to_text(fragments)
            fragments = explode_text_fragments(fragments)

            pattern = "|".join(re.escape(key) for key in self.selected_entries)
            matches = re.finditer(pattern, line_text, flags=re.RegexFlag(0))

            for match in matches:
                for i in range(match.start(), match.end()):
                    old_fragment, text, *_ = fragments[i]
                    fragments[i] = (
                        old_fragment + self.match_fragment,
                        fragments[i][1],
                    )

        return Transformation(fragments)


def get_current_key(doc, keys, get_start_end=False, get_expanded=False):
    """
    Get the key for the bibtex entry currently under the cursor.
    """
    position = doc.cursor_position
    if doc.current_line in keys:
        is_expanded = False
        key = doc.current_line
        if get_start_end:
            start_pos = position + doc.get_start_of_line_position()
            end_pos = position + doc.get_end_of_line_position()
    else:
        is_expanded = True
        start_pos = position + doc.start_of_paragraph()
        end_pos = position + doc.end_of_paragraph()
        # Cursor is in between entries:
        if doc.current_line == '':
            row = doc.cursor_position_row
            # If prev line is key take lower entry, else take upper entry:
            if doc.lines[row-1] in keys:
                start_pos = position + 1
            else:
                end_pos = position - 1
        # Get the key:
        start_bib = doc.text.find('@', start_pos)
        key_start = doc.text.find('{', start_bib)
        key_end = doc.text.find(',', start_bib)
        key = doc.text[key_start+1:key_end].strip()

    if not (get_start_end or get_expanded):
        return key

    output = [key]
    if get_start_end:
        output.append([start_pos, end_pos])
    if get_expanded:
        output.append(is_expanded)
    return tuple(output)


[docs] def browse(): """ A browser for the bibmanager database. """ # Content of the text buffer: bibs = bm.load() keys = [bib.key for bib in bibs] all_compact_text = "\n".join(keys) all_expanded_text = "\n\n".join(bib.meta() + bib.content for bib in bibs) # A list object, since I want this to be a global variable selected_content = [None] lex_style = style_from_pygments_cls( pygments.styles.get_style_by_name(cm.get('style'))) custom_style = Style.from_dict({ "status": "reverse", "status.position": "#aaaa00", "status.key": "#ffaa00", "shadow": "bg:#440044", "not-searching": "#888888", }) style = merge_styles([lex_style, custom_style]) def get_menubar_text(): return [ ("class:status", " ("), ("class:status.key", "enter"), ("class:status", ")select entry ("), ("class:status.key", "e"), ("class:status", ")xpand entry ("), ("class:status.key", "f"), ("class:status", ")ind ("), ("class:status.key", "s"), ("class:status", ")ave ("), ("class:status.key", "h"), ("class:status", ")elp ("), ("class:status.key", "q"), ("class:status", ")uit"), ] def get_menubar_right_text(): """Get index of entry under cursor.""" key = get_current_key(text_field.buffer.document, keys) return f" {keys.index(key) + 1} " def get_infobar_text(): """Get author-year-title of entry under cursor.""" key = get_current_key(text_field.buffer.document, keys) bib = bibs[keys.index(key)] year = '' if bib.year is None else bib.year title = 'NO_TITLE' if bib.title is None else bib.title return f"{bib.get_authors('ushort')}{year}: {title}" search_buffer = Buffer( completer=WordCompleter(keys), complete_while_typing=False, multiline=False) literal_search_field = SearchToolbar( search_buffer=search_buffer, forward_search_prompt = "Text search: ", backward_search_prompt = "Text search backward: ", ignore_case=False) # Entry search bar: authors_list = [bib.authors for bib in bibs] firsts = sorted(set([ u.get_authors([authors[0]], format='ushort') for authors in authors_list if authors is not None])) firsts = [ '^{'+first+'}' if ' ' in first else '^'+first for first in firsts] lasts = sorted(set([ u.get_authors([author], format='ushort') for authors in authors_list if authors is not None for author in authors])) lasts = [ '{'+last+'}' if ' ' in last else last for last in lasts] bibkeys = [bib.key for bib in bibs] bibcodes = [bib.bibcode for bib in bibs if bib.bibcode is not None] bibyears = sorted(set([ str(bib.year) for bib in bibs if bib.year is not None])) titles = [bib.title for bib in bibs] tags = sorted(set(itertools.chain( *[bib.tags for bib in bibs if bib.tags is not None]))) key_words = { 'author:"^"': firsts, 'author:""': lasts, 'year:': bibyears, 'title:""': titles, 'key:': bibkeys, 'bibcode:': bibcodes, 'tags:': tags, } completer = u.DynamicKeywordCompleter(key_words) suggester = u.DynamicKeywordSuggester() auto_suggest_bindings = load_auto_suggest_bindings() # Searcher: entry_search_buffer = Buffer( completer=completer, complete_while_typing=False, auto_suggest=suggester, ) def get_line_prefix(lineno, wrap_count): return FormattedText([('bold', 'Entry search: '),]) entry_search_field = Window( BufferControl( buffer=entry_search_buffer, input_processors=[AppendAutoSuggestion()], ), get_line_prefix=get_line_prefix, height=1) # Wrap in conditional container to display it only when focused: entry_search_focus = Condition( lambda: get_app().layout.current_window == entry_search_field) entry_search_container = ConditionalContainer( content=entry_search_field, filter=entry_search_focus, ) text_field = TextArea( text=all_compact_text, lexer=PygmentsLexer(BibTeXLexer), scrollbar=True, line_numbers=False, read_only=True, search_field=literal_search_field, input_processors=[HighlightEntryProcessor()], ) text_field.buffer.name = 'text_area_buffer' text_field.is_expanded = False text_field.compact_text = all_compact_text text_field.expanded_text = all_expanded_text # Shortcut to HighlightEntryProcessor: for processor in text_field.control.input_processors: if processor.__class__.__name__ == 'HighlightEntryProcessor': text_field.bm_processor = processor # Do not highlight searched text: sp = text_field.control.default_input_processors[0] sp._classname = ' ' sp._classname_current = ' ' menu_bar = VSplit([ Window( FormattedTextControl(get_menubar_text), style="class:status"), Window( FormattedTextControl(get_menubar_right_text), style="class:status.right", width=9, align=WindowAlign.RIGHT), ], height=1, ) info_bar = ConditionalContainer( content=Window( content=FormattedTextControl(get_infobar_text), height=D.exact(1), style="class:status", ), filter=~entry_search_focus, ) body = HSplit([ menu_bar, text_field, literal_search_field, entry_search_container, info_bar, ]) root_container = FloatContainer( content=body, floats=[ Float( xcursor=True, ycursor=True, content=CompletionsMenu(max_height=16, scroll_offset=1), ), ], ) # Key bindings: bindings = KeyBindings() text_focus = Condition( lambda: get_app().layout.current_window == text_field.window) dialog_focus = Condition( lambda: hasattr(get_app().layout.current_window, 'dialog')) @bindings.add("q", filter=text_focus) def _quit(event): event.app.exit() # Navigation: @bindings.add("g", filter=text_focus) def _go_to_first_line(event): event.current_buffer.cursor_position = 0 @bindings.add("G", filter=text_focus) def _go_to_last_line(event) -> None: event.current_buffer.cursor_position = len(event.current_buffer.text) @bindings.add("d", filter=text_focus) def _scroll_down(event): scroll_half_page_down(event) @bindings.add("u", filter=text_focus) def _scroll_up(event): scroll_half_page_up(event) @bindings.add("n", filter=text_focus) def _find_next(event): search_state = event.app.current_search_state event.current_buffer.apply_search( search_state, include_current_position=False, count=event.arg) @bindings.add("N", filter=text_focus) def _find_previous(event): search_state = event.app.current_search_state event.current_buffer.apply_search( ~search_state, include_current_position=False, count=event.arg) @bindings.add("h", filter=text_focus) def _show_help(event): show_message("Shortcuts", help_message) @bindings.add("f", filter=text_focus) def _start_literal_search(event): search.start_search(direction=search.SearchDirection.FORWARD) @bindings.add("k", filter=text_focus) def _start_entry_search(event): text_field.current_key = get_current_key( event.current_buffer.document, keys) event.app.layout.focus(entry_search_field) @bindings.add("b", filter=text_focus) def _open_in_browser(event): key = get_current_key(event.current_buffer.document, keys) bib = bm.find(key=key, bibs=bibs) if bib.adsurl is not None: webbrowser.open(bib.adsurl, new=2) else: show_message("Message", f"Entry '{key}' does not have an ADS url.") @bindings.add("c-c", filter=dialog_focus) def _close_dialog(event): get_app().layout.current_window.dialog.future.set_result(None) @bindings.add("s", filter=text_focus) def _save_selected_to_file(event): selected = text_field.bm_processor.selected_entries if len(selected) == 0: show_message("Message", "Nothing to save.") return async def coroutine(): dialog = TextInputDialog( title="Save to File", label_text="\nEnter a file path or leave blank to quit " "and print to screen:\n(press Control-c to cancel)\n", completer=PathCompleter(), ) path = await show_dialog_as_float(dialog) content = '\n\n'.join( bibs[keys.index(key)].content for key in selected) if path == "": selected_content[0] = content # The program termination is in TextInputDialog() since I # need to close this coroutine first. return if path is not None: try: with open(path, "w") as f: f.write(content) except IOError as e: show_message("Error", str(e)) ensure_future(coroutine()) @bindings.add("enter", filter=text_focus) def _toggle_selected_entry(event): "Select/deselect entry pointed by the cursor." key = get_current_key(event.current_buffer.document, keys) text_field.bm_processor.toggle_selected_entry(key) @bindings.add("enter", filter=entry_search_focus) def _select_entries(event): "Parse the input tag text and send focus back to main text." # Reset tag text to '': doc = event.current_buffer.document start_pos = doc.cursor_position + doc.get_start_of_line_position() event.current_buffer.cursor_position = start_pos event.current_buffer.delete( doc.get_end_of_line_position() - doc.get_start_of_line_position()) # Catch text and parse search text: matches = u.parse_search(doc.current_line) if len(matches) == 0: text_field.compact_text = all_compact_text[:] text_field.expanded_text = all_expanded_text[:] search_buffer.completer.words = keys else: text_field.compact_text = "\n".join([bib.key for bib in matches]) text_field.expanded_text = "\n\n".join( bib.meta() + bib.content for bib in matches) search_buffer.completer.words = [bib.key for bib in matches] # Return focus to main text: event.app.layout.focus(text_field.window) # Update main text with selected tag: buffer = event.current_buffer text_field.text = text_field.compact_text if text_field.current_key in search_buffer.completer.words: buffer_position = text_field.text.index(text_field.current_key) else: buffer_position = 0 buffer.cursor_position = buffer_position text_field.is_expanded = False @bindings.add("K", filter=text_focus) def _deselect_tags(event): buffer = event.current_buffer key = get_current_key(buffer.document, keys) text_field.compact_text = all_compact_text[:] text_field.expanded_text = all_expanded_text[:] search_buffer.completer.words = keys # Update main text: text_field.text = text_field.compact_text buffer.cursor_position = buffer.text.index(key) text_field.is_expanded = False @bindings.add("e", filter=text_focus) def _expand_collapse_entry(event): "Expand/collapse current entry." doc = event.current_buffer.document key, start_end, is_expanded = get_current_key( doc, keys, get_start_end=True, get_expanded=True) bib = bm.find(key=key, bibs=bibs) if is_expanded: # Remove blank lines around if surrounded by keys: start_row, _ = doc._find_line_start_index(start_end[0]) if start_row > 0 and doc.lines[start_row-2] in keys: start_end[0] -= 1 end_row, _ = doc._find_line_start_index(start_end[1]) if end_row < doc.line_count-1 and doc.lines[end_row+2] in keys: start_end[1] += 1 event.app.clipboard.set_text(bib.key) else: expanded_content = bib.meta() + bib.content row = doc.cursor_position_row # Add blank lines around if surrounded by keys: if row > 0 and doc.lines[row-1] != '': expanded_content = '\n' + expanded_content if row < doc.line_count-1 and doc.lines[row+1] != '': expanded_content = expanded_content + '\n' event.app.clipboard.set_text(expanded_content) text_field.read_only = False event.current_buffer.cursor_position = start_end[0] event.current_buffer.delete(count=start_end[1] - start_end[0]) event.current_buffer.paste_clipboard_data( event.app.clipboard.get_data(), count=event.arg, paste_mode=PasteMode.VI_BEFORE) text_field.read_only = True if is_expanded: event.current_buffer.cursor_position = start_end[0] @bindings.add("E", filter=text_focus) def _expand_collapse_all(event): "Expand/collapse all entries." buffer = event.current_buffer key = get_current_key(buffer.document, keys) if text_field.is_expanded: text_field.text = text_field.compact_text else: text_field.text = text_field.expanded_text buffer.cursor_position = buffer.text.index(key) text_field.is_expanded = not text_field.is_expanded @bindings.add("o", filter=text_focus) def _open_pdf(event): buffer = event.current_buffer key = get_current_key(buffer.document, keys) bib = bm.find(key=key, bibs=bibs) has_pdf = bib.pdf is not None has_bibcode = bib.bibcode is not None is_missing = has_pdf and not os.path.exists(f'{u.BM_PDF()}{bib.pdf}') if not has_pdf and not has_bibcode: show_message("Message", f"BibTeX entry '{key}' does not have a PDF.") return if has_pdf and not is_missing: pm.open(key=key) return if has_pdf and is_missing and not has_bibcode: show_message("Message", f"BibTeX entry has a PDF file: {bib.pdf}, but the file " "could not be found.") return # Need to fetch before opening: async def coroutine(): dialog = MessageDialog( "PDF file not found", "Fetch from ADS?\n(might take a few seconds ...)", asking=True) fetch = await show_dialog_as_float(dialog) if fetch: with io.StringIO() as buf, redirect_stdout(buf): fetched = pm.fetch(bib.bibcode, replace=True) fetch_output = buf.getvalue() if fetched is None: show_message("PDF fetch failed", fetch_output) else: show_message("PDF fetch succeeded.", fetch_output) pm.open(key=key) ensure_future(coroutine()) key_bindings = merge_key_bindings([ auto_suggest_bindings, bindings, ]) application = Application( layout=Layout(root_container, focused_element=text_field), key_bindings=key_bindings, enable_page_navigation_bindings=True, style=style, full_screen=True, ) application.run() if selected_content[0] is not None: tokens = list(pygments.lex(selected_content[0], lexer=BibTeXLexer())) print_formatted_text( PygmentsTokens(tokens), end="", style=lex_style, #output=create_output(sys.stdout), )