Blob: example-battery-provider
Blob id: 1522a5e075ca2f0441cf752c553a5d4b10d8f7f4
Size: 6.8 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 | #!/usr/bin/env python3 # SPDX-License-Identifier: LGPL-2.1-or-later import dbus import dbus.exceptions import dbus.mainloop.glib import dbus.service try: from gi.repository import GObject except ImportError: import gobject as GObject import sys mainloop = None app = None bus = None BLUEZ_SERVICE_NAME = 'org.bluez' DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' BATTERY_PROVIDER_MANAGER_IFACE = 'org.bluez.BatteryProviderManager1' BATTERY_PROVIDER_IFACE = 'org.bluez.BatteryProvider1' BATTERY_PROVIDER_PATH = '/path/to/provider' BATTERY_PATH1 = '11_11_11_11_11_11' BATTERY_PATH2 = '22_22_22_22_22_22' BATTERY_PATH3 = '33_33_33_33_33_33' class InvalidArgsException(dbus.exceptions.DBusException): _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' class Application(dbus.service.Object): def __init__(self, bus): self.path = BATTERY_PROVIDER_PATH self.services = [] self.batteries = [] dbus.service.Object.__init__(self, bus, self.path) def get_path(self): return dbus.ObjectPath(self.path) def add_battery(self, battery): self.batteries.append(battery) self.InterfacesAdded(battery.get_path(), battery.get_properties()) GObject.timeout_add(1000, drain_battery, battery) def remove_battery(self, battery): self.batteries.remove(battery) self.InterfacesRemoved(battery.get_path(), [BATTERY_PROVIDER_IFACE]) @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') def GetManagedObjects(self): response = {} print('GetManagedObjects called') for battery in self.batteries: response[battery.get_path()] = battery.get_properties() return response @dbus.service.signal(DBUS_OM_IFACE, signature='oa{sa{sv}}') def InterfacesAdded(self, object_path, interfaces_and_properties): return @dbus.service.signal(DBUS_OM_IFACE, signature='oas') def InterfacesRemoved(self, object_path, interfaces): return class Battery(dbus.service.Object): """ org.bluez.BatteryProvider1 interface implementation """ def __init__(self, bus, dev, percentage, source = None): self.path = BATTERY_PROVIDER_PATH + '/dev_' + dev self.dev_path = '/org/bluez/hci0/dev_' + dev self.bus = bus self.percentage = percentage self.source = source dbus.service.Object.__init__(self, bus, self.path) def get_battery_properties(self): properties = {} if self.percentage != None: properties['Percentage'] = dbus.Byte(self.percentage) if self.source != None: properties['Source'] = self.source properties['Device'] = dbus.ObjectPath(self.dev_path) return properties def get_properties(self): return { BATTERY_PROVIDER_IFACE: self.get_battery_properties() } def get_path(self): return dbus.ObjectPath(self.path) def set_percentage(self, percentage): if percentage < 0 or percentage > 100: print('percentage not valid') return self.percentage = percentage print('battery %s percentage %d' % (self.path, self.percentage)) self.PropertiesChanged( BATTERY_PROVIDER_IFACE, self.get_battery_properties()) @dbus.service.method(DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}') def GetAll(self, interface): if interface != BATTERY_PROVIDER_IFACE: raise InvalidArgsException() return self.get_properties()[BATTERY_PROVIDER_IFACE] @dbus.service.signal(DBUS_PROP_IFACE, signature='sa{sv}') def PropertiesChanged(self, interface, properties): return def add_late_battery(): app.add_battery(Battery(bus, BATTERY_PATH3, 70, 'Protocol 2')) def drain_battery(battery): new_percentage = 100 if battery.percentage != None: new_percentage = battery.percentage - 5 if new_percentage < 0: new_percentage = 0 battery.set_percentage(new_percentage) if new_percentage <= 0: return False return True def register_provider_cb(): print('Battery Provider registered') # Battery added early right after RegisterBatteryProvider succeeds app.add_battery(Battery(bus, BATTERY_PATH2, None)) # Battery added later GObject.timeout_add(5000, add_late_battery) def register_provider_error_cb(error): print('Failed to register Battery Provider: ' + str(error)) mainloop.quit() def find_manager(bus): remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE) objects = remote_om.GetManagedObjects() for o, props in objects.items(): if BATTERY_PROVIDER_MANAGER_IFACE in props.keys(): return o return None def unregister_provider_cb(): print('Battery Provider unregistered') def unregister_provider_error_cb(error): print('Failed to unregister Battery Provider: ' + str(error)) def unregister_battery_provider(battery_provider_manager): battery_provider_manager.UnregisterBatteryProvider(BATTERY_PROVIDER_PATH, reply_handler=unregister_provider_cb, error_handler=unregister_provider_error_cb) def remove_battery(app, battery): app.remove_battery(battery) """ Simulates an application registering to BlueZ as a Battery Provider providing fake batteries drained periodically. """ def main(): global mainloop, bus, app dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() manager_path = find_manager(bus) if not manager_path: print('BatteryProviderManager1 interface not found') return print('BatteryProviderManager1 path = ', manager_path) battery_provider_manager = dbus.Interface( bus.get_object(BLUEZ_SERVICE_NAME, manager_path), BATTERY_PROVIDER_MANAGER_IFACE) app = Application(bus) # Battery pre-added before RegisterBatteryProvider battery1 = Battery(bus, BATTERY_PATH1, 87, 'Protocol 1') app.add_battery(battery1) mainloop = GObject.MainLoop() print('Registering Battery Provider...') battery_provider_manager.RegisterBatteryProvider(BATTERY_PROVIDER_PATH, reply_handler=register_provider_cb, error_handler=register_provider_error_cb) # Unregister the Battery Provider after an arbitrary amount of time GObject.timeout_add( 12000, unregister_battery_provider, battery_provider_manager) # Simulate battery removal by a provider GObject.timeout_add(8000, remove_battery, app, battery1) mainloop.run() if __name__ == '__main__': main() |