518 lines
14 KiB
Python
518 lines
14 KiB
Python
# -*- test-case-name: twisted.conch.test.test_helper -*-
|
|
# Copyright (c) Twisted Matrix Laboratories.
|
|
# See LICENSE for details.
|
|
|
|
"""
|
|
Partial in-memory terminal emulator
|
|
|
|
@author: Jp Calderone
|
|
"""
|
|
|
|
from __future__ import print_function
|
|
|
|
import re, string
|
|
|
|
from zope.interface import implementer
|
|
|
|
from incremental import Version
|
|
|
|
from twisted.internet import defer, protocol, reactor
|
|
from twisted.python import log, _textattributes
|
|
from twisted.python.compat import iterbytes
|
|
from twisted.python.deprecate import deprecated, deprecatedModuleAttribute
|
|
from twisted.conch.insults import insults
|
|
|
|
FOREGROUND = 30
|
|
BACKGROUND = 40
|
|
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, N_COLORS = range(9)
|
|
|
|
|
|
|
|
class _FormattingState(_textattributes._FormattingStateMixin):
|
|
"""
|
|
Represents the formatting state/attributes of a single character.
|
|
|
|
Character set, intensity, underlinedness, blinkitude, video
|
|
reversal, as well as foreground and background colors made up a
|
|
character's attributes.
|
|
"""
|
|
compareAttributes = (
|
|
'charset', 'bold', 'underline', 'blink', 'reverseVideo', 'foreground',
|
|
'background', '_subtracting')
|
|
|
|
|
|
def __init__(self, charset=insults.G0, bold=False, underline=False,
|
|
blink=False, reverseVideo=False, foreground=WHITE,
|
|
background=BLACK, _subtracting=False):
|
|
self.charset = charset
|
|
self.bold = bold
|
|
self.underline = underline
|
|
self.blink = blink
|
|
self.reverseVideo = reverseVideo
|
|
self.foreground = foreground
|
|
self.background = background
|
|
self._subtracting = _subtracting
|
|
|
|
|
|
@deprecated(Version('Twisted', 13, 1, 0))
|
|
def wantOne(self, **kw):
|
|
"""
|
|
Add a character attribute to a copy of this formatting state.
|
|
|
|
@param **kw: An optional attribute name and value can be provided with
|
|
a keyword argument.
|
|
|
|
@return: A formatting state instance with the new attribute.
|
|
|
|
@see: L{DefaultFormattingState._withAttribute}.
|
|
"""
|
|
k, v = kw.popitem()
|
|
return self._withAttribute(k, v)
|
|
|
|
|
|
def toVT102(self):
|
|
# Spit out a vt102 control sequence that will set up
|
|
# all the attributes set here. Except charset.
|
|
attrs = []
|
|
if self._subtracting:
|
|
attrs.append(0)
|
|
if self.bold:
|
|
attrs.append(insults.BOLD)
|
|
if self.underline:
|
|
attrs.append(insults.UNDERLINE)
|
|
if self.blink:
|
|
attrs.append(insults.BLINK)
|
|
if self.reverseVideo:
|
|
attrs.append(insults.REVERSE_VIDEO)
|
|
if self.foreground != WHITE:
|
|
attrs.append(FOREGROUND + self.foreground)
|
|
if self.background != BLACK:
|
|
attrs.append(BACKGROUND + self.background)
|
|
if attrs:
|
|
return '\x1b[' + ';'.join(map(str, attrs)) + 'm'
|
|
return ''
|
|
|
|
CharacterAttribute = _FormattingState
|
|
|
|
deprecatedModuleAttribute(
|
|
Version('Twisted', 13, 1, 0),
|
|
'Use twisted.conch.insults.text.assembleFormattedText instead.',
|
|
'twisted.conch.insults.helper',
|
|
'CharacterAttribute')
|
|
|
|
|
|
|
|
# XXX - need to support scroll regions and scroll history
|
|
@implementer(insults.ITerminalTransport)
|
|
class TerminalBuffer(protocol.Protocol):
|
|
"""
|
|
An in-memory terminal emulator.
|
|
"""
|
|
for keyID in (b'UP_ARROW', b'DOWN_ARROW', b'RIGHT_ARROW', b'LEFT_ARROW',
|
|
b'HOME', b'INSERT', b'DELETE', b'END', b'PGUP', b'PGDN',
|
|
b'F1', b'F2', b'F3', b'F4', b'F5', b'F6', b'F7', b'F8', b'F9',
|
|
b'F10', b'F11', b'F12'):
|
|
execBytes = keyID + b" = object()"
|
|
execStr = execBytes.decode("ascii")
|
|
exec(execStr)
|
|
|
|
TAB = b'\t'
|
|
BACKSPACE = b'\x7f'
|
|
|
|
width = 80
|
|
height = 24
|
|
|
|
fill = b' '
|
|
void = object()
|
|
|
|
def getCharacter(self, x, y):
|
|
return self.lines[y][x]
|
|
|
|
|
|
def connectionMade(self):
|
|
self.reset()
|
|
|
|
|
|
def write(self, data):
|
|
"""
|
|
Add the given printable bytes to the terminal.
|
|
|
|
Line feeds in L{bytes} will be replaced with carriage return / line
|
|
feed pairs.
|
|
"""
|
|
for b in iterbytes(data.replace(b'\n', b'\r\n')):
|
|
self.insertAtCursor(b)
|
|
|
|
|
|
def _currentFormattingState(self):
|
|
return _FormattingState(self.activeCharset, **self.graphicRendition)
|
|
|
|
|
|
def insertAtCursor(self, b):
|
|
"""
|
|
Add one byte to the terminal at the cursor and make consequent state
|
|
updates.
|
|
|
|
If b is a carriage return, move the cursor to the beginning of the
|
|
current row.
|
|
|
|
If b is a line feed, move the cursor to the next row or scroll down if
|
|
the cursor is already in the last row.
|
|
|
|
Otherwise, if b is printable, put it at the cursor position (inserting
|
|
or overwriting as dictated by the current mode) and move the cursor.
|
|
"""
|
|
if b == b'\r':
|
|
self.x = 0
|
|
elif b == b'\n':
|
|
self._scrollDown()
|
|
elif b in string.printable.encode("ascii"):
|
|
if self.x >= self.width:
|
|
self.nextLine()
|
|
ch = (b, self._currentFormattingState())
|
|
if self.modes.get(insults.modes.IRM):
|
|
self.lines[self.y][self.x:self.x] = [ch]
|
|
self.lines[self.y].pop()
|
|
else:
|
|
self.lines[self.y][self.x] = ch
|
|
self.x += 1
|
|
|
|
|
|
def _emptyLine(self, width):
|
|
return [(self.void, self._currentFormattingState())
|
|
for i in range(width)]
|
|
|
|
|
|
def _scrollDown(self):
|
|
self.y += 1
|
|
if self.y >= self.height:
|
|
self.y -= 1
|
|
del self.lines[0]
|
|
self.lines.append(self._emptyLine(self.width))
|
|
|
|
|
|
def _scrollUp(self):
|
|
self.y -= 1
|
|
if self.y < 0:
|
|
self.y = 0
|
|
del self.lines[-1]
|
|
self.lines.insert(0, self._emptyLine(self.width))
|
|
|
|
|
|
def cursorUp(self, n=1):
|
|
self.y = max(0, self.y - n)
|
|
|
|
|
|
def cursorDown(self, n=1):
|
|
self.y = min(self.height - 1, self.y + n)
|
|
|
|
|
|
def cursorBackward(self, n=1):
|
|
self.x = max(0, self.x - n)
|
|
|
|
|
|
def cursorForward(self, n=1):
|
|
self.x = min(self.width, self.x + n)
|
|
|
|
|
|
def cursorPosition(self, column, line):
|
|
self.x = column
|
|
self.y = line
|
|
|
|
|
|
def cursorHome(self):
|
|
self.x = self.home.x
|
|
self.y = self.home.y
|
|
|
|
|
|
def index(self):
|
|
self._scrollDown()
|
|
|
|
|
|
def reverseIndex(self):
|
|
self._scrollUp()
|
|
|
|
|
|
def nextLine(self):
|
|
"""
|
|
Update the cursor position attributes and scroll down if appropriate.
|
|
"""
|
|
self.x = 0
|
|
self._scrollDown()
|
|
|
|
|
|
def saveCursor(self):
|
|
self._savedCursor = (self.x, self.y)
|
|
|
|
|
|
def restoreCursor(self):
|
|
self.x, self.y = self._savedCursor
|
|
del self._savedCursor
|
|
|
|
|
|
def setModes(self, modes):
|
|
for m in modes:
|
|
self.modes[m] = True
|
|
|
|
|
|
def resetModes(self, modes):
|
|
for m in modes:
|
|
try:
|
|
del self.modes[m]
|
|
except KeyError:
|
|
pass
|
|
|
|
|
|
def setPrivateModes(self, modes):
|
|
"""
|
|
Enable the given modes.
|
|
|
|
Track which modes have been enabled so that the implementations of
|
|
other L{insults.ITerminalTransport} methods can be properly implemented
|
|
to respect these settings.
|
|
|
|
@see: L{resetPrivateModes}
|
|
@see: L{insults.ITerminalTransport.setPrivateModes}
|
|
"""
|
|
for m in modes:
|
|
self.privateModes[m] = True
|
|
|
|
|
|
def resetPrivateModes(self, modes):
|
|
"""
|
|
Disable the given modes.
|
|
|
|
@see: L{setPrivateModes}
|
|
@see: L{insults.ITerminalTransport.resetPrivateModes}
|
|
"""
|
|
for m in modes:
|
|
try:
|
|
del self.privateModes[m]
|
|
except KeyError:
|
|
pass
|
|
|
|
|
|
def applicationKeypadMode(self):
|
|
self.keypadMode = 'app'
|
|
|
|
|
|
def numericKeypadMode(self):
|
|
self.keypadMode = 'num'
|
|
|
|
|
|
def selectCharacterSet(self, charSet, which):
|
|
self.charsets[which] = charSet
|
|
|
|
|
|
def shiftIn(self):
|
|
self.activeCharset = insults.G0
|
|
|
|
|
|
def shiftOut(self):
|
|
self.activeCharset = insults.G1
|
|
|
|
|
|
def singleShift2(self):
|
|
oldActiveCharset = self.activeCharset
|
|
self.activeCharset = insults.G2
|
|
f = self.insertAtCursor
|
|
def insertAtCursor(b):
|
|
f(b)
|
|
del self.insertAtCursor
|
|
self.activeCharset = oldActiveCharset
|
|
self.insertAtCursor = insertAtCursor
|
|
|
|
|
|
def singleShift3(self):
|
|
oldActiveCharset = self.activeCharset
|
|
self.activeCharset = insults.G3
|
|
f = self.insertAtCursor
|
|
def insertAtCursor(b):
|
|
f(b)
|
|
del self.insertAtCursor
|
|
self.activeCharset = oldActiveCharset
|
|
self.insertAtCursor = insertAtCursor
|
|
|
|
|
|
def selectGraphicRendition(self, *attributes):
|
|
for a in attributes:
|
|
if a == insults.NORMAL:
|
|
self.graphicRendition = {
|
|
'bold': False,
|
|
'underline': False,
|
|
'blink': False,
|
|
'reverseVideo': False,
|
|
'foreground': WHITE,
|
|
'background': BLACK}
|
|
elif a == insults.BOLD:
|
|
self.graphicRendition['bold'] = True
|
|
elif a == insults.UNDERLINE:
|
|
self.graphicRendition['underline'] = True
|
|
elif a == insults.BLINK:
|
|
self.graphicRendition['blink'] = True
|
|
elif a == insults.REVERSE_VIDEO:
|
|
self.graphicRendition['reverseVideo'] = True
|
|
else:
|
|
try:
|
|
v = int(a)
|
|
except ValueError:
|
|
log.msg("Unknown graphic rendition attribute: " + repr(a))
|
|
else:
|
|
if FOREGROUND <= v <= FOREGROUND + N_COLORS:
|
|
self.graphicRendition['foreground'] = v - FOREGROUND
|
|
elif BACKGROUND <= v <= BACKGROUND + N_COLORS:
|
|
self.graphicRendition['background'] = v - BACKGROUND
|
|
else:
|
|
log.msg("Unknown graphic rendition attribute: " + repr(a))
|
|
|
|
|
|
def eraseLine(self):
|
|
self.lines[self.y] = self._emptyLine(self.width)
|
|
|
|
|
|
def eraseToLineEnd(self):
|
|
width = self.width - self.x
|
|
self.lines[self.y][self.x:] = self._emptyLine(width)
|
|
|
|
|
|
def eraseToLineBeginning(self):
|
|
self.lines[self.y][:self.x + 1] = self._emptyLine(self.x + 1)
|
|
|
|
|
|
def eraseDisplay(self):
|
|
self.lines = [self._emptyLine(self.width) for i in range(self.height)]
|
|
|
|
|
|
def eraseToDisplayEnd(self):
|
|
self.eraseToLineEnd()
|
|
height = self.height - self.y - 1
|
|
self.lines[self.y + 1:] = [self._emptyLine(self.width) for i in range(height)]
|
|
|
|
|
|
def eraseToDisplayBeginning(self):
|
|
self.eraseToLineBeginning()
|
|
self.lines[:self.y] = [self._emptyLine(self.width) for i in range(self.y)]
|
|
|
|
|
|
def deleteCharacter(self, n=1):
|
|
del self.lines[self.y][self.x:self.x+n]
|
|
self.lines[self.y].extend(self._emptyLine(min(self.width - self.x, n)))
|
|
|
|
|
|
def insertLine(self, n=1):
|
|
self.lines[self.y:self.y] = [self._emptyLine(self.width) for i in range(n)]
|
|
del self.lines[self.height:]
|
|
|
|
|
|
def deleteLine(self, n=1):
|
|
del self.lines[self.y:self.y+n]
|
|
self.lines.extend([self._emptyLine(self.width) for i in range(n)])
|
|
|
|
|
|
def reportCursorPosition(self):
|
|
return (self.x, self.y)
|
|
|
|
|
|
def reset(self):
|
|
self.home = insults.Vector(0, 0)
|
|
self.x = self.y = 0
|
|
self.modes = {}
|
|
self.privateModes = {}
|
|
self.setPrivateModes([insults.privateModes.AUTO_WRAP,
|
|
insults.privateModes.CURSOR_MODE])
|
|
self.numericKeypad = 'app'
|
|
self.activeCharset = insults.G0
|
|
self.graphicRendition = {
|
|
'bold': False,
|
|
'underline': False,
|
|
'blink': False,
|
|
'reverseVideo': False,
|
|
'foreground': WHITE,
|
|
'background': BLACK}
|
|
self.charsets = {
|
|
insults.G0: insults.CS_US,
|
|
insults.G1: insults.CS_US,
|
|
insults.G2: insults.CS_ALTERNATE,
|
|
insults.G3: insults.CS_ALTERNATE_SPECIAL}
|
|
self.eraseDisplay()
|
|
|
|
|
|
def unhandledControlSequence(self, buf):
|
|
print('Could not handle', repr(buf))
|
|
|
|
|
|
def __bytes__(self):
|
|
lines = []
|
|
for L in self.lines:
|
|
buf = []
|
|
length = 0
|
|
for (ch, attr) in L:
|
|
if ch is not self.void:
|
|
buf.append(ch)
|
|
length = len(buf)
|
|
else:
|
|
buf.append(self.fill)
|
|
lines.append(b''.join(buf[:length]))
|
|
return b'\n'.join(lines)
|
|
|
|
|
|
|
|
class ExpectationTimeout(Exception):
|
|
pass
|
|
|
|
|
|
|
|
class ExpectableBuffer(TerminalBuffer):
|
|
_mark = 0
|
|
|
|
def connectionMade(self):
|
|
TerminalBuffer.connectionMade(self)
|
|
self._expecting = []
|
|
|
|
|
|
def write(self, data):
|
|
TerminalBuffer.write(self, data)
|
|
self._checkExpected()
|
|
|
|
|
|
def cursorHome(self):
|
|
TerminalBuffer.cursorHome(self)
|
|
self._mark = 0
|
|
|
|
|
|
def _timeoutExpected(self, d):
|
|
d.errback(ExpectationTimeout())
|
|
self._checkExpected()
|
|
|
|
|
|
def _checkExpected(self):
|
|
s = self.__bytes__()[self._mark:]
|
|
while self._expecting:
|
|
expr, timer, deferred = self._expecting[0]
|
|
if timer and not timer.active():
|
|
del self._expecting[0]
|
|
continue
|
|
for match in expr.finditer(s):
|
|
if timer:
|
|
timer.cancel()
|
|
del self._expecting[0]
|
|
self._mark += match.end()
|
|
s = s[match.end():]
|
|
deferred.callback(match)
|
|
break
|
|
else:
|
|
return
|
|
|
|
|
|
def expect(self, expression, timeout=None, scheduler=reactor):
|
|
d = defer.Deferred()
|
|
timer = None
|
|
if timeout:
|
|
timer = scheduler.callLater(timeout, self._timeoutExpected, d)
|
|
self._expecting.append((re.compile(expression), timer, d))
|
|
self._checkExpected()
|
|
return d
|
|
|
|
__all__ = [
|
|
'CharacterAttribute', 'TerminalBuffer', 'ExpectableBuffer']
|