audioworkstation.libs.audio.btaudiosink のソースコード
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Assists in connecting to Bluetooth device."""
import bluetooth as BT
import logging as LBT
from subprocess import Popen, run, PIPE
# Logger
logger = LBT.getLogger(__name__)
logger.setLevel(LBT.DEBUG)
_logger_formatter = LBT.Formatter("%(asctime)s %(levelname)s %(message)s")
# Logger StreamHandler
_logger_sh = LBT.StreamHandler()
_logger_sh.setFormatter(_logger_formatter)
logger.addHandler(_logger_sh)
def _audiosink(address: str) -> tuple:
"""Get information on "audio sink" service.
:param str address: device address
:return: (address, port)
"""
ass = BT.find_service(uuid=BT.AUDIO_SINK_CLASS, address=address)
if not ass:
return ("", 0)
return (ass[0]["host"], ass[0]["port"])
def _paired_devices() -> dict[str, str]:
"""Returns a list of paired devices.
:return: name, address
"""
devices: dict[str, str] = dict()
command = ["bluetoothctl", "--", "paired-devices"]
result = run(args=command, capture_output=True, text=True)
if result.returncode:
return devices
for line in result.stdout.splitlines():
# line: Devices <addres> <name>
parts = line.split(sep=" ", maxsplit=2)
devices[parts[2]] = parts[1]
return devices
def _isnearby_audiosink(address: str) -> bool:
"""Find out if the paired device with Audio Sink is nearby.
:param str address: paired device address
:return: True is nearby, False is otherwise
"""
if not BT.lookup_name(address=address):
return False
pre_pipe = ["bluetoothctl", "--", "info", address]
post_pipe = ["grep", "Audio Sink"]
result = Popen(args=pre_pipe, stdout=PIPE, text=True)
result = Popen(args=post_pipe, stdin=result.stdout, stdout=PIPE, text=True)
return True if result.communicate()[0] else False
def _isconnected(address: str, audio_sink: bool = True) -> bool:
"""Check the connection of paired devices nearby.
:param str address: address of paired devices nearby
:return: True is connected, False is otherwise
"""
pre_pipe = ["bluetoothctl", "--", "info", address]
post_pipe = ["grep", "Connected"]
result = Popen(args=pre_pipe, stdout=PIPE, text=True)
result = Popen(args=post_pipe, stdin=result.stdout, stdout=PIPE, text=True)
return True if result.communicate()[0].split()[1] == "yes" else False
def _connect(address: str) -> bool:
"""Attempts to connect to a Bluetooth device.
:param str address: device address
:return: True is success, False is otherwise
"""
command = ["bluetoothctl", "--", "connect", address]
result = run(args=command, capture_output=True, text=True)
return False if result.returncode else True
[ドキュメント]def device_info() -> dict[str, str]:
"""Get information on connected Bluetooth devices.
:return: {name: address}
:examples: {"name": "00:00:00:00:00:00"}
:examples: {"": ""} if failed.
"""
devices = _paired_devices()
for name, address in devices.items():
if _isnearby_audiosink(address):
if _isconnected(address):
return {name: address}
elif _connect(address):
return {name: address}
logger.info("Bluetooth Audio Sink Device: Not found.")
return {"": ""}
if __name__ == "__main__":
print(__file__)