【Textual】 Markdown标记文本组件中的代码块默认不支持自动换行,如何修改为自动...

查看 5|回复 0
作者:我心飞翔1995   
Markdown标记文本组件中的代码块默认不支持自动换行,如何修改为自动换行?
因为MarkdownFence类的_block方法返回的Syntax对象默认没有启用自动换行,在源码中修改是最简单的:
class MarkdownFence(MarkdownBlock):
    ... # 其他代码无需修改
    def _block(self) -> Syntax:
        return Syntax(
            self.code,
            lexer=self.lexer,
            word_wrap=True, # 只修改这里为True
            indent_guides=True,
            padding=(1, 2),
            theme=self.theme,
        )
然后在使用的时候,需要添加以下CSS样式:
MarkdownFence > * {
    width: 100%;
}
如果不想每次更新Textual之后修改,可以在源代码开头添加以下补丁代码:
# patch is here
from textual.widgets import Markdown
from textual.widgets._markdown import (
    MarkdownFence,MarkdownBlock,HEADINGS,
    MarkdownBlockQuote,
    MarkdownBulletList,
    MarkdownHorizontalRule,
    MarkdownParagraph,
    MarkdownOrderedList,
    MarkdownOrderedListItem,
    MarkdownUnorderedListItem,
    MarkdownTable,
    MarkdownTD,MarkdownTBody,
    MarkdownTH,MarkdownTHead,
    MarkdownTR
)
from rich.syntax import Syntax
from textual.await_complete import AwaitComplete
import asyncio
from markdown_it import MarkdownIt
from typing import Iterable
class MarkdownFence(MarkdownFence):
    DEFAULT_CSS = """
    MarkdownFence {
        margin: 1 0;
        overflow: auto;
        width: 100%;
        height: auto;
        max-height: 20;
        color: rgb(210,210,210);
    }
    MarkdownFence > * {
        width: 100%; /* core of patch */
    }
    """
    def _block(self) -> Syntax:
        return Syntax(
            self.code,
            lexer=self.lexer,
            word_wrap=True, # core of patch
            indent_guides=True,
            padding=(1, 2),
            theme=self.theme,
        )
class Markdown(Markdown):
    def update(self, markdown: str) -> AwaitComplete:
        """Update the document with new Markdown.
        Args:
            markdown: A string containing Markdown.
        Returns:
            An optionally awaitable object. Await this to ensure that all children have been mounted.
        """
        parser = (
            MarkdownIt("gfm-like")
            if self._parser_factory is None
            else self._parser_factory()
        )
        table_of_contents = []
        def parse_markdown(tokens) -> Iterable[MarkdownBlock]:
            """Create a stream of MarkdownBlock widgets from markdown.
            Args:
                tokens: List of tokens
            Yields:
                Widgets for mounting.
            """
            stack: list[MarkdownBlock] = []
            stack_append = stack.append
            block_id: int = 0
            for token in tokens:
                token_type = token.type
                if token_type == "heading_open":
                    block_id += 1
                    stack_append(HEADINGS[token.tag](self, id=f"block{block_id}"))
                elif token_type == "hr":
                    yield MarkdownHorizontalRule(self)
                elif token_type == "paragraph_open":
                    stack_append(MarkdownParagraph(self))
                elif token_type == "blockquote_open":
                    stack_append(MarkdownBlockQuote(self))
                elif token_type == "bullet_list_open":
                    stack_append(MarkdownBulletList(self))
                elif token_type == "ordered_list_open":
                    stack_append(MarkdownOrderedList(self))
                elif token_type == "list_item_open":
                    if token.info:
                        stack_append(MarkdownOrderedListItem(self, token.info))
                    else:
                        item_count = sum(
                            1
                            for block in stack
                            if isinstance(block, MarkdownUnorderedListItem)
                        )
                        stack_append(
                            MarkdownUnorderedListItem(
                                self,
                                self.BULLETS[item_count % len(self.BULLETS)],
                            )
                        )
                elif token_type == "table_open":
                    stack_append(MarkdownTable(self))
                elif token_type == "tbody_open":
                    stack_append(MarkdownTBody(self))
                elif token_type == "thead_open":
                    stack_append(MarkdownTHead(self))
                elif token_type == "tr_open":
                    stack_append(MarkdownTR(self))
                elif token_type == "th_open":
                    stack_append(MarkdownTH(self))
                elif token_type == "td_open":
                    stack_append(MarkdownTD(self))
                elif token_type.endswith("_close"):
                    block = stack.pop()
                    if token.type == "heading_close":
                        heading = block._text.plain
                        level = int(token.tag[1:])
                        table_of_contents.append((level, heading, block.id))
                    if stack:
                        stack[-1]._blocks.append(block)
                    else:
                        yield block
                elif token_type == "inline":
                    stack[-1].build_from_token(token)
                elif token_type in ("fence", "code_block"):
                    fence = MarkdownFence(self, token.content.rstrip(), token.info)
                    if stack:
                        stack[-1]._blocks.append(fence)
                    else:
                        yield fence
                else:
                    external = self.unhandled_token(token)
                    if external is not None:
                        if stack:
                            stack[-1]._blocks.append(external)
                        else:
                            yield external
        markdown_block = self.query("MarkdownBlock")
        async def await_update() -> None:
            """Update in batches."""
            BATCH_SIZE = 200
            batch: list[MarkdownBlock] = []
            tokens = await asyncio.get_running_loop().run_in_executor(
                None, parser.parse, markdown
            )
            # Lock so that you can't update with more than one document simultaneously
            async with self.lock:
                # Remove existing blocks for the first batch only
                removed: bool = False
                async def mount_batch(batch: list[MarkdownBlock]) -> None:
                    """Mount a single match of blocks.
                    Args:
                        batch: A list of blocks to mount.
                    """
                    nonlocal removed
                    if removed:
                        await self.mount_all(batch)
                    else:
                        with self.app.batch_update():
                            await markdown_block.remove()
                            await self.mount_all(batch)
                        removed = True
                for block in parse_markdown(tokens):
                    batch.append(block)
                    if len(batch) == BATCH_SIZE:
                        await mount_batch(batch)
                        batch.clear()
                if batch:
                    await mount_batch(batch)
                if not removed:
                    await markdown_block.remove()
            self._table_of_contents = table_of_contents
            self.post_message(
                Markdown.TableOfContentsUpdated(
                    self, self._table_of_contents
                ).set_sender(self)
            )
        return AwaitComplete(await_update())
# patch is over
完整示例如下:
from textual.app import App
# patch is here
from textual.widgets import Markdown
from textual.widgets._markdown import (
    MarkdownFence,MarkdownBlock,HEADINGS,
    MarkdownBlockQuote,
    MarkdownBulletList,
    MarkdownHorizontalRule,
    MarkdownParagraph,
    MarkdownOrderedList,
    MarkdownOrderedListItem,
    MarkdownUnorderedListItem,
    MarkdownTable,
    MarkdownTD,MarkdownTBody,
    MarkdownTH,MarkdownTHead,
    MarkdownTR
)
from rich.syntax import Syntax
from textual.await_complete import AwaitComplete
import asyncio
from markdown_it import MarkdownIt
from typing import Iterable
class MarkdownFence(MarkdownFence):
    DEFAULT_CSS = """
    MarkdownFence {
        margin: 1 0;
        overflow: auto;
        width: 100%;
        height: auto;
        max-height: 20;
        color: rgb(210,210,210);
    }
    MarkdownFence > * {
        width: 100%; /* core of patch */
    }
    """
    def _block(self) -> Syntax:
        return Syntax(
            self.code,
            lexer=self.lexer,
            word_wrap=True, # core of patch
            indent_guides=True,
            padding=(1, 2),
            theme=self.theme,
        )
class Markdown(Markdown):
    def update(self, markdown: str) -> AwaitComplete:
        """Update the document with new Markdown.
        Args:
            markdown: A string containing Markdown.
        Returns:
            An optionally awaitable object. Await this to ensure that all children have been mounted.
        """
        parser = (
            MarkdownIt("gfm-like")
            if self._parser_factory is None
            else self._parser_factory()
        )
        table_of_contents = []
        def parse_markdown(tokens) -> Iterable[MarkdownBlock]:
            """Create a stream of MarkdownBlock widgets from markdown.
            Args:
                tokens: List of tokens
            Yields:
                Widgets for mounting.
            """
            stack: list[MarkdownBlock] = []
            stack_append = stack.append
            block_id: int = 0
            for token in tokens:
                token_type = token.type
                if token_type == "heading_open":
                    block_id += 1
                    stack_append(HEADINGS[token.tag](self, id=f"block{block_id}"))
                elif token_type == "hr":
                    yield MarkdownHorizontalRule(self)
                elif token_type == "paragraph_open":
                    stack_append(MarkdownParagraph(self))
                elif token_type == "blockquote_open":
                    stack_append(MarkdownBlockQuote(self))
                elif token_type == "bullet_list_open":
                    stack_append(MarkdownBulletList(self))
                elif token_type == "ordered_list_open":
                    stack_append(MarkdownOrderedList(self))
                elif token_type == "list_item_open":
                    if token.info:
                        stack_append(MarkdownOrderedListItem(self, token.info))
                    else:
                        item_count = sum(
                            1
                            for block in stack
                            if isinstance(block, MarkdownUnorderedListItem)
                        )
                        stack_append(
                            MarkdownUnorderedListItem(
                                self,
                                self.BULLETS[item_count % len(self.BULLETS)],
                            )
                        )
                elif token_type == "table_open":
                    stack_append(MarkdownTable(self))
                elif token_type == "tbody_open":
                    stack_append(MarkdownTBody(self))
                elif token_type == "thead_open":
                    stack_append(MarkdownTHead(self))
                elif token_type == "tr_open":
                    stack_append(MarkdownTR(self))
                elif token_type == "th_open":
                    stack_append(MarkdownTH(self))
                elif token_type == "td_open":
                    stack_append(MarkdownTD(self))
                elif token_type.endswith("_close"):
                    block = stack.pop()
                    if token.type == "heading_close":
                        heading = block._text.plain
                        level = int(token.tag[1:])
                        table_of_contents.append((level, heading, block.id))
                    if stack:
                        stack[-1]._blocks.append(block)
                    else:
                        yield block
                elif token_type == "inline":
                    stack[-1].build_from_token(token)
                elif token_type in ("fence", "code_block"):
                    fence = MarkdownFence(self, token.content.rstrip(), token.info)
                    if stack:
                        stack[-1]._blocks.append(fence)
                    else:
                        yield fence
                else:
                    external = self.unhandled_token(token)
                    if external is not None:
                        if stack:
                            stack[-1]._blocks.append(external)
                        else:
                            yield external
        markdown_block = self.query("MarkdownBlock")
        async def await_update() -> None:
            """Update in batches."""
            BATCH_SIZE = 200
            batch: list[MarkdownBlock] = []
            tokens = await asyncio.get_running_loop().run_in_executor(
                None, parser.parse, markdown
            )
            # Lock so that you can't update with more than one document simultaneously
            async with self.lock:
                # Remove existing blocks for the first batch only
                removed: bool = False
                async def mount_batch(batch: list[MarkdownBlock]) -> None:
                    """Mount a single match of blocks.
                    Args:
                        batch: A list of blocks to mount.
                    """
                    nonlocal removed
                    if removed:
                        await self.mount_all(batch)
                    else:
                        with self.app.batch_update():
                            await markdown_block.remove()
                            await self.mount_all(batch)
                        removed = True
                for block in parse_markdown(tokens):
                    batch.append(block)
                    if len(batch) == BATCH_SIZE:
                        await mount_batch(batch)
                        batch.clear()
                if batch:
                    await mount_batch(batch)
                if not removed:
                    await markdown_block.remove()
            self._table_of_contents = table_of_contents
            self.post_message(
                Markdown.TableOfContentsUpdated(
                    self, self._table_of_contents
                ).set_sender(self)
            )
        return AwaitComplete(await_update())
# patch is over
TEXT = '''\
{这里自己写符合Markdown语法要求的代码块}
'''
class MyApp(App):
    def on_mount(self):
        self.widgets = [
            Markdown(TEXT)
        ]
        self.mount_all(self.widgets)
if __name__ == '__main__':
    app = MyApp()
    app.run()
爱飞的猫编辑:修正代码高亮

代码, 换行

您需要登录后才可以回帖 登录 | 立即注册

返回顶部