implement mouse hid

This commit is contained in:
Alkaid 2018-01-05 15:24:21 +08:00
parent 93bcbf6e64
commit cb2886283a
7 changed files with 461 additions and 158 deletions

1
.gitignore vendored Normal file
View file

@ -0,0 +1 @@
.idea

85
BluetoothHID.py Normal file
View file

@ -0,0 +1,85 @@
import dbus
import dbus.service
import os
import socket
class BluetoothHIDProfile(dbus.service.Object):
def __init__(self, bus, path):
super(BluetoothHIDProfile, self).__init__(bus, path)
self.fd = -1
@dbus.service.method("org.bluez.Profile1", in_signature="", out_signature="")
def Release(self):
raise NotImplementedError("Release")
@dbus.service.method("org.bluez.Profile1", in_signature="", out_signature="")
def Cancel(self):
raise NotImplementedError("Cancel")
@dbus.service.method("org.bluez.Profile1", in_signature="oha{sv}", out_signature="")
def NewConnection(self, path, fd, properties):
self.fd = fd.take()
print("New Connection from (%s, %d)" % (path, self.fd))
for k, v in properties.items():
if k == "Version" or k == "Features":
print(" %s = 0x%04x " % (k, v))
else:
print(" %s = %s" % (k, v))
@dbus.service.method("org.bluez.Profile1",
in_signature="o", out_signature="")
def RequestDisconnection(self, path):
print("RequestDisconnection(%s)" % (path))
if (self.fd > 0):
os.close(self.fd)
self.fd = -1
def error_handler(e):
raise RuntimeError(str(e))
class BluetoothHIDService(object):
PROFILE_PATH = "/org/bluez/bthid_profile"
HOST = 0
PORT = 1
def __init__(self, service_record):
self.P_CTRL = 0x0011
self.P_INTR = 0x0013
self.SELFMAC = "7C:67:A2:94:6B:B8"
bus = dbus.SystemBus()
bluez_obj = bus.get_object("org.bluez", "/org/bluez")
manager = dbus.Interface(bluez_obj, "org.bluez.ProfileManager1")
BluetoothHIDProfile(bus, self.PROFILE_PATH)
opts = {
"ServiceRecord": service_record,
"Name": "BTKeyboardProfile",
"RequireAuthentication": False,
"RequireAuthorization": False,
"Service": "MY BTKBD",
"Role": "server"
}
sock_control = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP)
sock_control.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock_inter = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP)
sock_inter.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock_control.bind((self.SELFMAC, self.P_CTRL))
sock_inter.bind((self.SELFMAC, self.P_INTR))
manager.RegisterProfile(self.PROFILE_PATH, "00001124-0000-1000-8000-00805f9b34fb", opts)
print("Registered")
sock_control.listen(1)
sock_inter.listen(1)
print("waiting for connection")
self.ccontrol, cinfo = sock_control.accept()
print("Control channel connected to " + cinfo[self.HOST])
self.cinter, cinfo = sock_inter.accept()
print("Interrupt channel connected to " + cinfo[self.HOST])
def send(self, bytes_buf):
self.cinter.send(bytes_buf)

View file

@ -1,155 +0,0 @@
import subprocess
import keymap
import dbus
import dbus.service
import shlex
from gi.repository import GObject
from dbus.mainloop.glib import DBusGMainLoop
from bluetooth import *
DBusGMainLoop(set_as_default=True)
from Bluetooth import *
class Keyboard:
def __init__(self):
self.state = bytearray([
0xA1,
0x01, # Report ID
0x00, # Modifier keys
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00
])
print("Calling libinput")
try:
self.libinput = subprocess.Popen(shlex.split("stdbuf -oL libinput debug-events --show-keycode"), stdout=subprocess.PIPE, bufsize=0, universal_newlines=True)
except Exception as ex:
sys.exit("Can't listen to libinput")
self.parser = {"KEYBOARD_KEY": self.parser_key}
print("Keyboard initialized")
def parser_key(self, line_str_list):
"""
event0 KEYBOARD_KEY +2.82s KEY_F (33) release/pressed
0 1 2 3 4 5
:param line_str_list:
:return:
"""
key_sym = line_str_list[3]
action = line_str_list[5]
hid_keycode = keymap.keytable.get(key_sym, None)
if hid_keycode is None:
hid_modkeys = keymap.modkeys.get(key_sym, None)
if (hid_modkeys is not None):
if action == "pressed":
self.state[3] &= hid_modkeys
elif action == "released":
self.state[3] &= ~hid_modkeys
else:
raise NotImplemented("Unknown action " + action)
else:
raise NotImplementedError("Unknown key " + key_sym)
else:
for i in range(4, 10):
if action == "released" and self.state[i] == hid_keycode:
self.state[i] = 0x00
break
elif action == "pressed" and self.state[i] == 0x00:
self.state[i] = hid_keycode
break
def event_loop(self, send_call_back):
for line in self.libinput.stdout:
line_str_list = line.split()
event_type = line_str_list[1]
handler = self.parser.get(event_type, None)
print("READ: " + line)
if handler is not None:
handler(line_str_list)
send_call_back(bytes(self.state))
class BTKeyboardProfile(dbus.service.Object):
def __init__(self, bus, path, input_dev):
super(BTKeyboardProfile, self).__init__(bus, path)
self.input_dev = input_dev
self.fd = -1
@dbus.service.method("org.bluez.Profile1", in_signature="", out_signature="")
def Release(self):
raise NotImplementedError("Release")
@dbus.service.method("org.bluez.Profile1", in_signature="", out_signature="")
def Cancel(self):
raise NotImplementedError("Cancel")
@dbus.service.method("org.bluez.Profile1", in_signature="oha{sv}", out_signature="")
def NewConnection(self, path, fd, properties):
import pdb
pdb.set_trace()
self.fd = fd.take()
print("New Connection from (%s, %d)" % (path, self.fd))
for k, v in properties.items():
if k == "Version" or k == "Features":
print(" %s = 0x%04x " % (k, v))
else:
print(" %s = %s" % (k, v))
@dbus.service.method("org.bluez.Profile1",
in_signature="o", out_signature="")
def RequestDisconnection(self, path):
print("RequestDisconnection(%s)" % (path))
if (self.fd > 0):
os.close(self.fd)
self.fd = -1
def error_handler(e):
mainloop.quit()
raise RuntimeError(str(e))
if __name__ == "__main__":
P_CTRL = 0x0011
P_INTR = 0x0013
HOST = 0
PORT = 1
SELFMAC = "7C:67:A2:94:6B:B8"
DBusGMainLoop(set_as_default=True)
profile_path = "/org/bluez/btk_profile"
bus = dbus.SystemBus()
bluez_obj = bus.get_object("org.bluez", "/org/bluez")
manager = dbus.Interface(bluez_obj, "org.bluez.ProfileManager1")
kbd = Keyboard()
# kbd.event_loop(print)
BTKP = BTKeyboardProfile(bus, profile_path, kbd)
service_record = open("sdp_record_kbd.xml").read()
opts = {
"ServiceRecord": service_record,
"Name": "BTKeyboardProfile",
"RequireAuthentication" : False,
"RequireAuthorization" : False,
"Service" : "MY BTKBD",
"Role" : "server"
}
soccontrol = BluetoothSocket(L2CAP)
sockinter = BluetoothSocket(L2CAP)
soccontrol.bind((SELFMAC, P_CTRL))
sockinter.bind((SELFMAC, P_INTR))
manager.RegisterProfile(profile_path, "00001124-0000-1000-8000-00805f9b34fb", opts)
print("Registered")
soccontrol.listen(1)
sockinter.listen(1)
print("waiting for connection")
ccontrol, cinfo = soccontrol.accept()
print("Control channel connected to " + cinfo[HOST])
cinter, cinfo = sockinter.accept()
print("Interrupt channel connected to " + cinfo[HOST])
kbd.event_loop(cinter.send)
mainloop = GObject.MainLoop()
mainloop.run()

8
TODO Normal file
View file

@ -0,0 +1,8 @@
# TODO
1. modularize main X application window and the key/mouse event parsing.
2. modify sdp record so we won't need the privilege to bind low L2CAP port.
3. make the wheel (touchpad swipe gesture) work
4. write a readme
5. summarize keyboard shortcut, [reference](https://forum.xda-developers.com/showthread.php?t=1672281)
6. low resolution of pointer events in Xlib

130
evdev_xkb_map.py Normal file
View file

@ -0,0 +1,130 @@
evdev_xkb_map = {
94: 100, # Non-US \ NonUS Backslash
49: 53, # ` Grave
10: 30, # 1
11: 31, # 2
12: 32, # 3
13: 33, # 4
14: 34, # 5
15: 35, # 6
16: 36, # 7
17: 37, # 8
18: 38, # 9
19: 39, # 0
20: 45, # - Minus
21: 46, # = Equals
22: 42, # Delete
23: 43, # Tab
24: 20, # Q
25: 26, # W
26: 8, # E
27: 21, # R
28: 23, # T
29: 28, # Y
30: 24, # U
31: 12, # I
32: 18, # O
33: 19, # P
34: 47, # [ Left Bracket
35: 48, # ] Right Bracket
51: 49, # \ Backslash
36: 40, # Enter
66: 57, # Caps Lock
38: 4, # A
39: 22, # S
40: 7, # D
41: 9, # F
42: 10, # G
43: 11, # H
44: 13, # J
45: 14, # K
46: 15, # L
47: 51, # ; Semicolon
48: 52, # ' Quote
50: 225, # Left Shift
52: 29, # Z
53: 27, # X
54: 6, # C
55: 25, # V
56: 5, # B
57: 17, # N
58: 16, # M
59: 54, # , Comma
60: 55, # . Period
61: 56, # / Slash
62: 229, # Right Shift
64: 226, # Left Alt
37: 224, # Left Control
65: 44, # Space
105: 228, # Right Control
108: 230, # Right Alt
133: 227, # Left GUI
134: 231, # Right GUI
9: 41, # Escape
67: 58, # F1
68: 59, # F2
69: 60, # F3
70: 61, # F4
71: 62, # F5
72: 63, # F6
73: 64, # F7
74: 65, # F8
75: 66, # F9
76: 67, # F10
95: 68, # F11
96: 69, # F12
107: 70, # Print Screen
78: 71, # Scroll Lock
127: 72, # Pause
118: 73, # Insert
110: 74, # Home
112: 75, # Page Up
119: 76, # Delete Forward
115: 77, # End
117: 78, # Page Down
111: 82, # Up
113: 80, # Left
116: 81, # Down
114: 79, # Right
77: 83, # KP NumLock
106: 84, # KP / KP Divide
63: 85, # KP * KP Multiply
82: 86, # KP - KP Subtract
79: 95, # KP 7
80: 96, # KP 8
81: 97, # KP 9
86: 87, # KP + KP Add
83: 92, # KP 4
84: 93, # KP 5
85: 94, # KP 6
87: 89, # KP 1
88: 90, # KP 2
89: 91, # KP 3
104: 88, # KP Enter
90: 98, # KP 0
91: 99, # KP . KP Point
125: 103, # KP = KP Equals
191: 104, # F13
192: 105, # F14
193: 106, # F15
194: 107, # F16
195: 108, # F17
196: 109, # F18
197: 110, # F19
198: 111, # F20
199: 112, # F21
200: 113, # F22
201: 114, # F23
202: 115, # F24
}
modkeys = {
231: 1 << 7, # KEY_RIGHTMETA
230: 1 << 6, # KEY_RIGHTALT
229: 1 << 5, # KEY_RIGHTSHIFT
228: 1 << 4, # KEY_RIGHTCTRL
227: 1 << 3, # KEY_LEFTMETA
226: 1 << 2, # KEY_LEFTALT
225: 1 << 1, # KEY_LEFTSHIFT
224: 1 << 0 # KEY_LEFTCTRL
}

234
main.py Executable file
View file

@ -0,0 +1,234 @@
#!/usr/bin/python3
import sys
import os
import time
from BluetoothHID import BluetoothHIDService
from evdev_xkb_map import evdev_xkb_map, modkeys
import keymap
from Xlib import X, display, Xutil
from dbus.mainloop.glib import DBusGMainLoop
usbhid_map = {}
with open("keycode.txt") as f:
for line in f.read().splitlines():
if not line.startswith(";") and len(line) > 1:
l = line.split(maxsplit=1)
usbhid_keycode = int(l[0])
usbhid_keyname = l[1]
usbhid_map[usbhid_keycode] = usbhid_keyname
# Application window (only one)
class Window(object):
def __init__(self, display):
self.d = display
self.objects = []
# Find which screen to open the window on
self.screen = self.d.screen()
self.window = self.screen.root.create_window(
50, 50, 640, 480, 2,
self.screen.root_depth,
X.InputOutput,
X.CopyFromParent,
# special attribute values
background_pixel=self.screen.white_pixel,
event_mask=(X.ExposureMask |
X.StructureNotifyMask |
X.ButtonPressMask |
X.ButtonReleaseMask |
X.Button1MotionMask) |
X.KeyPressMask |
X.KeyReleaseMask,
colormap=X.CopyFromParent,
)
self.gc = self.window.create_gc(
foreground=self.screen.black_pixel,
background=self.screen.white_pixel,
)
# Set some WM info
self.WM_DELETE_WINDOW = self.d.intern_atom('WM_DELETE_WINDOW')
self.WM_PROTOCOLS = self.d.intern_atom('WM_PROTOCOLS')
self.window.set_wm_name('EmuBTHID')
self.window.set_wm_icon_name('EmuBTHID')
self.window.set_wm_protocols([self.WM_DELETE_WINDOW])
self.window.set_wm_hints(flags=Xutil.StateHint,
initial_state=Xutil.NormalState)
self.window.set_wm_normal_hints(flags=(Xutil.PPosition | Xutil.PSize
| Xutil.PMinSize),
min_width=20,
min_height=20)
# Map the window, making it visible
self.window.map()
def grab(self):
print("Grab!")
ret = self.window.grab_pointer(False, X.ButtonReleaseMask | X.ButtonPressMask | X.PointerMotionMask,
X.GrabModeAsync, X.GrabModeAsync, self.window, X.NONE, X.CurrentTime)
ret = self.window.grab_keyboard(False, X.GrabModeAsync, X.GrabModeAsync, X.CurrentTime)
def ungrab(self):
print("UnGrab!")
self.d.ungrab_pointer(X.CurrentTime)
self.d.ungrab_keyboard(X.CurrentTime)
# Main loop, handling events
def loop(self, send_call_back):
kbd_state = bytearray([
0xA1,
0x01, # Report ID
0x00, # Modifier keys
0x00, # preserve
0x00, # 6 key
0x00,
0x00,
0x00,
0x00,
0x00
])
mouse_state = bytearray([
0xA1,
0x02, # Report ID
0x00, # mouse button, in this byte XXXXX(button2)(button1)(button0)
0x00, # X displacement
0x00, # Y displacement
])
expose_count = 0
grab_trigger_hint = ('KEY_LEFTCTRL', 'KEY_LEFTALT', 'KEY_LEFTSHIFT', 'KEY_B')
grab_trigger = set(keymap.keytable[k] for k in grab_trigger_hint)
grab_cnt = len(grab_trigger)
grabbed = False
geometry = self.window.get_geometry()
prev_x = None
prev_y = None
hint_x = geometry.width // 5
hint_y = geometry.height // 5
hint_str = 'Press Ctrl+Alt+Shift+B to Grab'.encode()
while 1:
e = self.d.next_event()
# Window has been destroyed, quit
if e.type == X.DestroyNotify:
print("Destroy")
sys.exit(0)
if e.type == X.KeyPress:
usbhid_keycode = evdev_xkb_map[e.detail]
# print("key pressed: {}".format(usbhid_map[usbhid_keycode]))
if usbhid_keycode in modkeys:
kbd_state[2] |= modkeys[usbhid_keycode]
# import ipdb
# ipdb.set_trace()
else:
for i in range(4, 10):
if kbd_state[i] == 0x00:
kbd_state[i] = usbhid_keycode
break
send_call_back(bytes(kbd_state))
if usbhid_keycode in grab_trigger:
grab_cnt -= 1
print(grab_cnt)
if (grab_cnt == 0):
if grabbed:
self.ungrab()
grabbed = False
hint_str = 'Press Ctrl+Alt+Shift+B to Grab'.encode()
self.window.image_text(self.gc, hint_x, hint_y, hint_str)
else:
self.grab()
grabbed = True
hint_str = 'Press Ctrl+Alt+Shift+B to UnGrab'.encode()
self.window.image_text(self.gc, hint_x, hint_y, hint_str)
if e.type == X.KeyRelease:
usbhid_keycode = evdev_xkb_map[e.detail]
# print("key released: {}".format(usbhid_map[evdev_xkb_map[e.detail]]))
if usbhid_keycode in modkeys:
kbd_state[2] &= ~modkeys[usbhid_keycode]
else:
for i in range(4, 10):
if kbd_state[i] == usbhid_keycode:
kbd_state[i] = 0x00
break
if usbhid_keycode in grab_trigger:
grab_cnt += 1
print(grab_cnt)
send_call_back(bytes(kbd_state))
# Some part of the window has been exposed,
# redraw all the objects.
if e.type == X.Expose:
expose_count += 1
print("Exposed : {}".format(expose_count))
geometry = self.window.get_geometry()
hint_x = geometry.width // 5
hint_y = geometry.height // 5
self.window.image_text(self.gc, hint_x, hint_y, hint_str)
# Left button pressed, start to draw
if e.type == X.ButtonPress:
# print("Button press: {}".format(e.detail))
if (e.detail <= 3):
mouse_state[2] |= 1 << (e.detail - 1)
send_call_back(bytes(mouse_state))
if e.type == X.ButtonRelease:
# print("Button release: {}".format(e.detail))
mouse_state[2] &= ~(1 << (e.detail - 1))
send_call_back(bytes(mouse_state))
if e.type == X.ClientMessage:
if e.client_type == self.WM_PROTOCOLS:
fmt, data = e.data
if fmt == 32 and data[0] == self.WM_DELETE_WINDOW:
sys.exit(0)
if e.type == X.MotionNotify:
#print("Motion: ({x},{y})".format(x=e.event_x, y=e.event_y))
if prev_x is not None and prev_y is not None:
pos_x = max(-128, min(int((e.event_x - prev_x) * 2), 127))
mouse_state[3] = pos_x if pos_x >= 0 else (256 + pos_x)
pos_y = max(-128, min(int((e.event_y - prev_y) * 2), 127))
mouse_state[4] = pos_y if pos_y >= 0 else (256 + pos_y)
#print(" ({},{})".format(mouse_state[3], mouse_state[4]))
send_call_back(bytes(mouse_state))
if e.event_x == geometry.width - 1:
self.window.warp_pointer(1, e.event_y)
prev_x = 1
elif e.event_x == 0:
self.window.warp_pointer(geometry.width - 2, e.event_y)
prev_x = geometry.width - 2
else:
prev_x = e.event_x
if e.event_y == geometry.height - 1:
self.window.warp_pointer(e.event_x, 1)
prev_y = 1
elif e.event_y == 0:
self.window.warp_pointer(e.event_x, geometry.height - 2)
prev_y = geometry.height - 2
else:
prev_y = e.event_y
if __name__ == '__main__':
DBusGMainLoop(set_as_default=True)
service_record = open("sdp_record_kbd.xml").read()
d = display.Display()
d.change_keyboard_control(auto_repeat_mode=X.AutoRepeatModeOff)
try:
bthid_srv = BluetoothHIDService(service_record)
Window(d).loop(bthid_srv.send)
#Window(d).loop(print)
finally:
d.change_keyboard_control(auto_repeat_mode=X.AutoRepeatModeOn)
d.get_keyboard_control()
d.ungrab_keyboard(X.CurrentTime)
d.ungrab_pointer(X.CurrentTime)
print("Exit")

View file

@ -10,7 +10,7 @@
<sequence>
<sequence>
<uuid value="0x0100" />
<uint16 value="0x0011" />
<uint16 value="0x1011" />
</sequence>
<sequence>
<uuid value="0x0011" />
@ -42,7 +42,7 @@
<sequence>
<sequence>
<uuid value="0x0100" />
<uint16 value="0x0013" />
<uint16 value="0x1013" />
</sequence>
<sequence>
<uuid value="0x0011" />
@ -81,7 +81,7 @@
<sequence>
<sequence>
<uint8 value="0x22" />
<text encoding="hex" value="05010906a101850175019508050719e029e715002501810295017508810395057501050819012905910295017503910395067508150026ff000507190029ff8100c0050c0901a1018503150025017501950b0a23020a21020ab10109b809b609cd09b509e209ea09e9093081029501750d8103c0" />
<text encoding="hex" value="05010906a101850175019508050719e029e715002501810295017508810395057501050819012905910295017503910395067508150026ff000507190029ff8100c0050c0901a1018503150025017501950b0a23020a21020ab10109b809b609cd09b509e209ea09e9093081029501750d8103c005010902a1010901a100850295037501050919012903150025018102950175058103750895020501093009311581257f46670c3699f36513550c8106c0c0" />
</sequence>
</sequence>
</attribute>