因为MarkdownFence类的_block方法返回的Syntax对象默认没有启用自动换行,在源码中修改是最简单的:
class MarkdownFence(MarkdownBlock):
... # 其他代码无需修改
def _block(self) -> Syntax:
return Syntax(
self.code,
lexer=self.lexer,
word_wrap=True, # 只修改这里为True
indent_guides=True,
padding=(1, 2),
theme=self.theme,
)
然后在使用的时候,需要添加以下CSS样式:
MarkdownFence > * {
width: 100%;
}
如果不想每次更新Textual之后修改,可以在源代码开头添加以下补丁代码:
# patch is here
from textual.widgets import Markdown
from textual.widgets._markdown import (
MarkdownFence,MarkdownBlock,HEADINGS,
MarkdownBlockQuote,
MarkdownBulletList,
MarkdownHorizontalRule,
MarkdownParagraph,
MarkdownOrderedList,
MarkdownOrderedListItem,
MarkdownUnorderedListItem,
MarkdownTable,
MarkdownTD,MarkdownTBody,
MarkdownTH,MarkdownTHead,
MarkdownTR
)
from rich.syntax import Syntax
from textual.await_complete import AwaitComplete
import asyncio
from markdown_it import MarkdownIt
from typing import Iterable
class MarkdownFence(MarkdownFence):
DEFAULT_CSS = """
MarkdownFence {
margin: 1 0;
overflow: auto;
width: 100%;
height: auto;
max-height: 20;
color: rgb(210,210,210);
}
MarkdownFence > * {
width: 100%; /* core of patch */
}
"""
def _block(self) -> Syntax:
return Syntax(
self.code,
lexer=self.lexer,
word_wrap=True, # core of patch
indent_guides=True,
padding=(1, 2),
theme=self.theme,
)
class Markdown(Markdown):
def update(self, markdown: str) -> AwaitComplete:
"""Update the document with new Markdown.
Args:
markdown: A string containing Markdown.
Returns:
An optionally awaitable object. Await this to ensure that all children have been mounted.
"""
parser = (
MarkdownIt("gfm-like")
if self._parser_factory is None
else self._parser_factory()
)
table_of_contents = []
def parse_markdown(tokens) -> Iterable[MarkdownBlock]:
"""Create a stream of MarkdownBlock widgets from markdown.
Args:
tokens: List of tokens
Yields:
Widgets for mounting.
"""
stack: list[MarkdownBlock] = []
stack_append = stack.append
block_id: int = 0
for token in tokens:
token_type = token.type
if token_type == "heading_open":
block_id += 1
stack_append(HEADINGS[token.tag](self, id=f"block{block_id}"))
elif token_type == "hr":
yield MarkdownHorizontalRule(self)
elif token_type == "paragraph_open":
stack_append(MarkdownParagraph(self))
elif token_type == "blockquote_open":
stack_append(MarkdownBlockQuote(self))
elif token_type == "bullet_list_open":
stack_append(MarkdownBulletList(self))
elif token_type == "ordered_list_open":
stack_append(MarkdownOrderedList(self))
elif token_type == "list_item_open":
if token.info:
stack_append(MarkdownOrderedListItem(self, token.info))
else:
item_count = sum(
1
for block in stack
if isinstance(block, MarkdownUnorderedListItem)
)
stack_append(
MarkdownUnorderedListItem(
self,
self.BULLETS[item_count % len(self.BULLETS)],
)
)
elif token_type == "table_open":
stack_append(MarkdownTable(self))
elif token_type == "tbody_open":
stack_append(MarkdownTBody(self))
elif token_type == "thead_open":
stack_append(MarkdownTHead(self))
elif token_type == "tr_open":
stack_append(MarkdownTR(self))
elif token_type == "th_open":
stack_append(MarkdownTH(self))
elif token_type == "td_open":
stack_append(MarkdownTD(self))
elif token_type.endswith("_close"):
block = stack.pop()
if token.type == "heading_close":
heading = block._text.plain
level = int(token.tag[1:])
table_of_contents.append((level, heading, block.id))
if stack:
stack[-1]._blocks.append(block)
else:
yield block
elif token_type == "inline":
stack[-1].build_from_token(token)
elif token_type in ("fence", "code_block"):
fence = MarkdownFence(self, token.content.rstrip(), token.info)
if stack:
stack[-1]._blocks.append(fence)
else:
yield fence
else:
external = self.unhandled_token(token)
if external is not None:
if stack:
stack[-1]._blocks.append(external)
else:
yield external
markdown_block = self.query("MarkdownBlock")
async def await_update() -> None:
"""Update in batches."""
BATCH_SIZE = 200
batch: list[MarkdownBlock] = []
tokens = await asyncio.get_running_loop().run_in_executor(
None, parser.parse, markdown
)
# Lock so that you can't update with more than one document simultaneously
async with self.lock:
# Remove existing blocks for the first batch only
removed: bool = False
async def mount_batch(batch: list[MarkdownBlock]) -> None:
"""Mount a single match of blocks.
Args:
batch: A list of blocks to mount.
"""
nonlocal removed
if removed:
await self.mount_all(batch)
else:
with self.app.batch_update():
await markdown_block.remove()
await self.mount_all(batch)
removed = True
for block in parse_markdown(tokens):
batch.append(block)
if len(batch) == BATCH_SIZE:
await mount_batch(batch)
batch.clear()
if batch:
await mount_batch(batch)
if not removed:
await markdown_block.remove()
self._table_of_contents = table_of_contents
self.post_message(
Markdown.TableOfContentsUpdated(
self, self._table_of_contents
).set_sender(self)
)
return AwaitComplete(await_update())
# patch is over
完整示例如下:
from textual.app import App
# patch is here
from textual.widgets import Markdown
from textual.widgets._markdown import (
MarkdownFence,MarkdownBlock,HEADINGS,
MarkdownBlockQuote,
MarkdownBulletList,
MarkdownHorizontalRule,
MarkdownParagraph,
MarkdownOrderedList,
MarkdownOrderedListItem,
MarkdownUnorderedListItem,
MarkdownTable,
MarkdownTD,MarkdownTBody,
MarkdownTH,MarkdownTHead,
MarkdownTR
)
from rich.syntax import Syntax
from textual.await_complete import AwaitComplete
import asyncio
from markdown_it import MarkdownIt
from typing import Iterable
class MarkdownFence(MarkdownFence):
DEFAULT_CSS = """
MarkdownFence {
margin: 1 0;
overflow: auto;
width: 100%;
height: auto;
max-height: 20;
color: rgb(210,210,210);
}
MarkdownFence > * {
width: 100%; /* core of patch */
}
"""
def _block(self) -> Syntax:
return Syntax(
self.code,
lexer=self.lexer,
word_wrap=True, # core of patch
indent_guides=True,
padding=(1, 2),
theme=self.theme,
)
class Markdown(Markdown):
def update(self, markdown: str) -> AwaitComplete:
"""Update the document with new Markdown.
Args:
markdown: A string containing Markdown.
Returns:
An optionally awaitable object. Await this to ensure that all children have been mounted.
"""
parser = (
MarkdownIt("gfm-like")
if self._parser_factory is None
else self._parser_factory()
)
table_of_contents = []
def parse_markdown(tokens) -> Iterable[MarkdownBlock]:
"""Create a stream of MarkdownBlock widgets from markdown.
Args:
tokens: List of tokens
Yields:
Widgets for mounting.
"""
stack: list[MarkdownBlock] = []
stack_append = stack.append
block_id: int = 0
for token in tokens:
token_type = token.type
if token_type == "heading_open":
block_id += 1
stack_append(HEADINGS[token.tag](self, id=f"block{block_id}"))
elif token_type == "hr":
yield MarkdownHorizontalRule(self)
elif token_type == "paragraph_open":
stack_append(MarkdownParagraph(self))
elif token_type == "blockquote_open":
stack_append(MarkdownBlockQuote(self))
elif token_type == "bullet_list_open":
stack_append(MarkdownBulletList(self))
elif token_type == "ordered_list_open":
stack_append(MarkdownOrderedList(self))
elif token_type == "list_item_open":
if token.info:
stack_append(MarkdownOrderedListItem(self, token.info))
else:
item_count = sum(
1
for block in stack
if isinstance(block, MarkdownUnorderedListItem)
)
stack_append(
MarkdownUnorderedListItem(
self,
self.BULLETS[item_count % len(self.BULLETS)],
)
)
elif token_type == "table_open":
stack_append(MarkdownTable(self))
elif token_type == "tbody_open":
stack_append(MarkdownTBody(self))
elif token_type == "thead_open":
stack_append(MarkdownTHead(self))
elif token_type == "tr_open":
stack_append(MarkdownTR(self))
elif token_type == "th_open":
stack_append(MarkdownTH(self))
elif token_type == "td_open":
stack_append(MarkdownTD(self))
elif token_type.endswith("_close"):
block = stack.pop()
if token.type == "heading_close":
heading = block._text.plain
level = int(token.tag[1:])
table_of_contents.append((level, heading, block.id))
if stack:
stack[-1]._blocks.append(block)
else:
yield block
elif token_type == "inline":
stack[-1].build_from_token(token)
elif token_type in ("fence", "code_block"):
fence = MarkdownFence(self, token.content.rstrip(), token.info)
if stack:
stack[-1]._blocks.append(fence)
else:
yield fence
else:
external = self.unhandled_token(token)
if external is not None:
if stack:
stack[-1]._blocks.append(external)
else:
yield external
markdown_block = self.query("MarkdownBlock")
async def await_update() -> None:
"""Update in batches."""
BATCH_SIZE = 200
batch: list[MarkdownBlock] = []
tokens = await asyncio.get_running_loop().run_in_executor(
None, parser.parse, markdown
)
# Lock so that you can't update with more than one document simultaneously
async with self.lock:
# Remove existing blocks for the first batch only
removed: bool = False
async def mount_batch(batch: list[MarkdownBlock]) -> None:
"""Mount a single match of blocks.
Args:
batch: A list of blocks to mount.
"""
nonlocal removed
if removed:
await self.mount_all(batch)
else:
with self.app.batch_update():
await markdown_block.remove()
await self.mount_all(batch)
removed = True
for block in parse_markdown(tokens):
batch.append(block)
if len(batch) == BATCH_SIZE:
await mount_batch(batch)
batch.clear()
if batch:
await mount_batch(batch)
if not removed:
await markdown_block.remove()
self._table_of_contents = table_of_contents
self.post_message(
Markdown.TableOfContentsUpdated(
self, self._table_of_contents
).set_sender(self)
)
return AwaitComplete(await_update())
# patch is over
TEXT = '''\
{这里自己写符合Markdown语法要求的代码块}
'''
class MyApp(App):
def on_mount(self):
self.widgets = [
Markdown(TEXT)
]
self.mount_all(self.widgets)
if __name__ == '__main__':
app = MyApp()
app.run()
爱飞的猫编辑:修正代码高亮