mirror of
https://github.com/Alkaid-Benetnash/EmuBTHID.git
synced 2026-02-14 03:24:14 +01:00
116 lines
4.5 KiB
Python
116 lines
4.5 KiB
Python
import bluetooth # requires pybluez package
|
|
import dbus
|
|
import dbus.service
|
|
import os
|
|
import socket
|
|
|
|
|
|
# see section "2.5 PSMs and SPSMs": https://www.bluetooth.com/wp-content/uploads/Files/Specification/HTML/Assigned_Numbers/out/en/Assigned_Numbers.pdf
|
|
BLUETOOTH_PORT_HID_CONTROL = 0x0011
|
|
BLUETOOTH_PORT_HID_INTERRUPT = 0x0013
|
|
|
|
# see section "3.3 SDP Service Class and Profile Identifiers" of https://www.bluetooth.com/wp-content/uploads/Files/Specification/HTML/Assigned_Numbers/out/en/Assigned_Numbers.pdf
|
|
# and Vol 3, Part B, section 2.5.1 of https://www.bluetooth.com/specifications/specs/core-specification-5-3/
|
|
BLUETOOT_PROFILE_HID = "00001124-0000-1000-8000-00805f9b34fb"
|
|
|
|
|
|
class BluetoothHIDProfile(dbus.service.Object):
|
|
def __init__(self, bus, path):
|
|
super(BluetoothHIDProfile, self).__init__(bus, path)
|
|
self.fd = -1
|
|
|
|
@dbus.service.method("org.bluez.Profile1", in_signature="", out_signature="")
|
|
def Release(self):
|
|
raise NotImplementedError("Release")
|
|
|
|
@dbus.service.method("org.bluez.Profile1", in_signature="", out_signature="")
|
|
def Cancel(self):
|
|
raise NotImplementedError("Cancel")
|
|
|
|
@dbus.service.method("org.bluez.Profile1", in_signature="oha{sv}", out_signature="")
|
|
def NewConnection(self, path, fd, properties):
|
|
self.fd = fd.take()
|
|
print("New Connection from (%s, %d)" % (path, self.fd))
|
|
for k, v in properties.items():
|
|
if k == "Version" or k == "Features":
|
|
print(" %s = 0x%04x " % (k, v))
|
|
else:
|
|
print(" %s = %s" % (k, v))
|
|
|
|
@dbus.service.method("org.bluez.Profile1",
|
|
in_signature="o", out_signature="")
|
|
def RequestDisconnection(self, path):
|
|
print("RequestDisconnection(%s)" % (path))
|
|
|
|
if (self.fd > 0):
|
|
os.close(self.fd)
|
|
self.fd = -1
|
|
|
|
|
|
def error_handler(e):
|
|
raise RuntimeError(str(e))
|
|
|
|
|
|
class BluetoothHIDService(object):
|
|
PROFILE_PATH = "/org/bluez/emubthid_profile"
|
|
|
|
HOST = 0
|
|
PORT = 1
|
|
|
|
def __init__(self, service_record, MAC):
|
|
self.SELFMAC = MAC
|
|
|
|
bus = dbus.SystemBus()
|
|
bluez_obj = bus.get_object("org.bluez", "/org/bluez")
|
|
manager = dbus.Interface(bluez_obj, "org.bluez.ProfileManager1")
|
|
|
|
BluetoothHIDProfile(bus, self.PROFILE_PATH)
|
|
|
|
sock_control = L2CAPSocket(True)
|
|
sock_inter = L2CAPSocket(True)
|
|
|
|
# The PSM parameter sets the bluetooth "port" for L2CAP connections
|
|
# "Valid values in the first range are assigned by the Bluetooth SIG and indicate protocols."
|
|
# https://www.bluetooth.com/wp-content/uploads/Files/Specification/HTML/Core-54/out/en/host/logical-link-control-and-adaptation-protocol-specification.html
|
|
sock_control.bind((self.SELFMAC, BLUETOOTH_PORT_HID_CONTROL))
|
|
sock_inter.bind((self.SELFMAC, BLUETOOTH_PORT_HID_INTERRUPT))
|
|
|
|
# IDEs dont see this function because bluetooth.BluetoothSocket imports them dynamically
|
|
sock_control.listen(1)
|
|
sock_inter.listen(1)
|
|
|
|
manager.RegisterProfile(self.PROFILE_PATH, BLUETOOT_PROFILE_HID, {
|
|
"Name": "Emulated Bluetooth keyboard",
|
|
# "Service": "MY BTKBD", # this refers to the bluetooth spec service class uuid
|
|
"Role": "server",
|
|
"RequireAuthentication": True,
|
|
"RequireAuthorization": True,
|
|
"ServiceRecord": service_record,
|
|
# note: it seems as if socket creation could be replaced with working through DBus (see PSM option and NewConnection function).
|
|
# maybe currently the conenction is somehow handled at 2 places at the same time?
|
|
#
|
|
# https://www.bluez.org/bluez-5-api-introduction-and-porting-guide/
|
|
# "The new Profile1 interface (and removal of org.bluez.Service)"
|
|
})
|
|
|
|
print("Registered Bluetooth HID profile in Bluez")
|
|
|
|
print(f"Waiting for connection at controller {MAC}, please double check with the MAC in bluetoothctl")
|
|
|
|
self.ccontrol, cinfo = sock_control.accept()
|
|
print("Control channel connected to " + cinfo[self.HOST])
|
|
self.cinter, cinfo = sock_inter.accept()
|
|
print("Interrupt channel connected to " + cinfo[self.HOST])
|
|
|
|
def send(self, bytes_buf):
|
|
self.cinter.send(bytes_buf)
|
|
|
|
|
|
# https://github.com/karulis/pybluez/blob/master/examples/simple/l2capserver.py
|
|
class L2CAPSocket(bluetooth.BluetoothSocket):
|
|
|
|
def __init__(self, reuseaddr: False):
|
|
super().__init__(bluetooth.L2CAP)
|
|
|
|
if reuseaddr:
|
|
self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |