Blob: test-health
Blob id: f26def906fc09a5b37703a5bf4951f73fa9d3f8f
Size: 5.2 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 | #!/usr/bin/env python3 # SPDX-License-Identifier: LGPL-2.1-or-later from __future__ import absolute_import, print_function, unicode_literals # -*- coding: utf-8 -*- import sys import dbus import dbus.service from dbus.mainloop.glib import DBusGMainLoop try: from gi.repository import GObject except ImportError: import gobject as GObject BUS_NAME = 'org.bluez' PATH = '/org/bluez' ADAPTER_INTERFACE = 'org.bluez.Adapter1' HEALTH_MANAGER_INTERFACE = 'org.bluez.HealthManager1' HEALTH_DEVICE_INTERFACE = 'org.bluez.HealthDevice1' DBusGMainLoop(set_as_default=True) loop = GObject.MainLoop() bus = dbus.SystemBus() def sig_received(*args, **kwargs): if "member" not in kwargs: return if "path" not in kwargs: return; sig_name = kwargs["member"] path = kwargs["path"] print(sig_name) print(path) if sig_name == "PropertyChanged": k, v = args print(k) print(v) else: ob = args[0] print(ob) def enter_mainloop(): bus.add_signal_receiver(sig_received, bus_name=BUS_NAME, dbus_interface=HEALTH_DEVICE_INTERFACE, path_keyword="path", member_keyword="member", interface_keyword="interface") try: print("Entering main lopp, push Ctrl+C for finish") mainloop = GObject.MainLoop() mainloop.run() except KeyboardInterrupt: pass finally: print("Exiting, bye") hdp_manager = dbus.Interface(bus.get_object(BUS_NAME, PATH), HEALTH_MANAGER_INTERFACE) role = None while role == None: print("Select 1. source or 2. sink: ",) try: sel = int(sys.stdin.readline()) if sel == 1: role = "source" elif sel == 2: role = "sink" else: raise ValueError except (TypeError, ValueError): print("Wrong selection, try again: ",) except KeyboardInterrupt: sys.exit() dtype = None while dtype == None: print("Select a data type: ",) try: sel = int(sys.stdin.readline()) if (sel < 0) or (sel > 65535): raise ValueError dtype = sel; except (TypeError, ValueError): print("Wrong selection, try again: ",) except KeyboardInterrupt: sys.exit() pref = None if role == "source": while pref == None: try: print("Select a preferred data channel type 1.",) print("reliable 2. streaming: ",) sel = int(sys.stdin.readline()) if sel == 1: pref = "reliable" elif sel == 2: pref = "streaming" else: raise ValueError except (TypeError, ValueError): print("Wrong selection, try again") except KeyboardInterrupt: sys.exit() app_path = hdp_manager.CreateApplication({ "DataType": dbus.types.UInt16(dtype), "Role": role, "Description": "Test Source", "ChannelType": pref}) else: app_path = hdp_manager.CreateApplication({ "DataType": dbus.types.UInt16(dtype), "Description": "Test sink", "Role": role}) print("New application created:", app_path) con = None while con == None: try: print("Connect to a remote device (y/n)? ",) sel = sys.stdin.readline() if sel in ("y\n", "yes\n", "Y\n", "YES\n"): con = True elif sel in ("n\n", "no\n", "N\n", "NO\n"): con = False else: print("Wrong selection, try again.") except KeyboardInterrupt: sys.exit() if not con: enter_mainloop() sys.exit() manager = dbus.Interface(bus.get_object(BUS_NAME, "/"), "org.freedesktop.DBus.ObjectManager") objects = manager.GetManagedObjects() adapters = [] for path, ifaces in objects.items(): if ifaces.has_key(ADAPTER_INTERFACE): adapters.append(path) i = 1 for ad in adapters: print("%d. %s" % (i, ad)) i = i + 1 print("Select an adapter: ",) select = None while select == None: try: pos = int(sys.stdin.readline()) - 1 if pos < 0: raise TypeError select = adapters[pos] except (TypeError, IndexError, ValueError): print("Wrong selection, try again: ",) except KeyboardInterrupt: sys.exit() adapter = dbus.Interface(bus.get_object(BUS_NAME, select), ADAPTER_INTERFACE) devices = [] for path, interfaces in objects.items(): if "org.bluez.Device1" not in interfaces: continue properties = interfaces["org.bluez.Device1"] if properties["Adapter"] != select: continue; if HEALTH_DEVICE_INTERFACE not in interfaces: continue devices.append(path) if len(devices) == 0: print("No devices available") sys.exit() i = 1 for dev in devices: print("%d. %s" % (i, dev)) i = i + 1 print("Select a device: ",) select = None while select == None: try: pos = int(sys.stdin.readline()) - 1 if pos < 0: raise TypeError select = devices[pos] except (TypeError, IndexError, ValueError): print("Wrong selection, try again: ",) except KeyboardInterrupt: sys.exit() device = dbus.Interface(bus.get_object(BUS_NAME, select), HEALTH_DEVICE_INTERFACE) echo = None while echo == None: try: print("Perform an echo (y/n)? ",) sel = sys.stdin.readline() if sel in ("y\n", "yes\n", "Y\n", "YES\n"): echo = True elif sel in ("n\n", "no\n", "N\n", "NO\n"): echo = False else: print("Wrong selection, try again.") except KeyboardInterrupt: sys.exit() if echo: if device.Echo(): print("Echo was ok") else: print("Echo war wrong, exiting") sys.exit() print("Connecting to device %s" % (select)) if role == "source": chan = device.CreateChannel(app_path, "reliable") else: chan = device.CreateChannel(app_path, "any") print(chan) enter_mainloop() hdp_manager.DestroyApplication(app_path) |