diff --git a/test/map-client b/test/map-client
index e29e6e4..62a075f 100755
--- a/test/map-client
+++ b/test/map-client
from pprint import pformat
+BUS_NAME='org.bluez.obex'
+PATH = '/org/bluez/obex'
+CLIENT_INTERFACE = 'org.bluez.obex.Client1'
+SESSION_INTERFACE = 'org.bluez.obex.Session1'
+MESSAGE_ACCESS_INTERFACE = 'org.bluez.obex.MessageAccess1'
+MESSAGE_INTERFACE = 'org.bluez.obex.Message1'
+TRANSFER_INTERFACE = 'org.bluez.obex.Transfer1'
+
def unwrap(x):
"""Hack to unwrap D-Bus values, so that they're easier to read when
printed. Taken from d-feet """
self.verbose = verbose
self.path = session_path
bus = dbus.SessionBus()
- obj = bus.get_object("org.bluez.obex", session_path)
- self.session = dbus.Interface(obj, "org.bluez.obex.Session")
- self.map = dbus.Interface(obj, "org.bluez.obex.MessageAccess")
- bus.add_signal_receiver(self.transfer_complete,
- dbus_interface="org.bluez.obex.Transfer",
- signal_name="Complete",
- path_keyword="path")
- bus.add_signal_receiver(self.transfer_error,
- dbus_interface="org.bluez.obex.Transfer",
- signal_name="Error",
- path_keyword="path")
+ obj = bus.get_object(BUS_NAME, session_path)
+ self.session = dbus.Interface(obj, SESSION_INTERFACE)
+ self.map = dbus.Interface(obj, MESSAGE_ACCESS_INTERFACE)
+ bus.add_signal_receiver(self.properties_changed,
+ dbus_interface="org.freedesktop.DBus.Properties",
+ signal_name="PropertiesChanged",
+ path_keyword="path")
def create_transfer_reply(self, reply):
(path, properties) = reply
mainloop.quit()
def transfer_complete(self, path):
- if path != self.transfer_path:
- return
if self.verbose:
print "Transfer finished"
properties = self.props.get(path)
os.remove(properties["Filename"])
print f.readlines()
- def transfer_error(self, code, message, path):
- if path != self.transfer_path:
- return
- print "Transfer finished with error %s: %s" % (code, message)
+ def transfer_error(self, path):
+ print "Transfer %s error" % path
mainloop.quit()
+ def properties_changed(self, interface, properties, invalidated, path):
+ req = self.props.get(path)
+ if req == None:
+ return
+
+ if properties['Status'] == 'complete':
+ self.transfer_complete(path)
+ return
+
+ if properties['Status'] == 'error':
+ self.transfer_error(path)
+ return
+
def set_folder(self, new_dir):
self.map.SetFolder(new_dir)
def get_message(self, handle):
self.map.ListMessages("", dict())
path = self.path + "/message" + handle
- obj = bus.get_object("org.bluez.obex", path)
- msg = dbus.Interface(obj, "org.bluez.obex.Message")
+ obj = bus.get_object(BUS_NAME, path)
+ msg = dbus.Interface(obj, MESSAGE_INTERFACE)
msg.Get("", True, reply_handler=self.create_transfer_reply,
error_handler=self.error)
def get_message_properties(self, handle):
self.map.ListMessages("", dict())
path = self.path + "/message" + handle
- obj = bus.get_object("org.bluez.obex", path)
+ obj = bus.get_object(BUS_NAME, path)
msg = dbus.Interface(obj, "org.freedesktop.DBus.Properties")
- ret = msg.GetAll("org.bluez.obex.Message")
+ ret = msg.GetAll(MESSAGE_INTERFACE)
print pformat(unwrap(ret))
def set_message_property(self, handle, prop, flag):
self.map.ListMessages("", dict())
path = self.path + "/message" + handle
- obj = bus.get_object("org.bluez.obex", path)
- msg = dbus.Interface(obj, "org.bluez.obex.Message")
+ obj = bus.get_object(BUS_NAME, path)
+ msg = dbus.Interface(obj, MESSAGE_INTERFACE)
msg.SetProperty (prop, flag);
def update_inbox(self):
bus = dbus.SessionBus()
mainloop = gobject.MainLoop()
- client = dbus.Interface(bus.get_object("org.bluez.obex",
- "/org/bluez/obex"),
- "org.bluez.obex.Client")
+ client = dbus.Interface(bus.get_object(BUS_NAME, PATH),
+ CLIENT_INTERFACE)
print "Creating Session"
path = client.CreateSession(options.device, { "Target": "map" })