Diff between 7eb0eeab689e0a9f6314b65b02ed19893a0fb365 and 2e82b4bc2f0ed281e9ab2de0bada2173871b90aa

Changed Files

File Additions Deletions Status
Makefile.tools +1 -1 modified
test/test-hfp +234 -0 added

Full Patch

diff --git a/Makefile.tools b/Makefile.tools
index cf87ef5..422c57f 100644
--- a/Makefile.tools
+++ b/Makefile.tools
@@ -209,4 +209,4 @@ EXTRA_DIST += test/sap_client.py test/hsplay test/hsmicro \
 		test/test-health-sink test/service-record.dtd \
 		test/service-did.xml test/service-spp.xml test/service-opp.xml \
 		test/service-ftp.xml test/simple-player test/test-nap \
-		test/test-heartrate test/test-alert
+		test/test-heartrate test/test-alert test/test-hfp
diff --git a/test/test-hfp b/test/test-hfp
new file mode 100755
index 0000000..9aa4ec0
--- /dev/null
+++ b/test/test-hfp
@@ -0,0 +1,234 @@
+#!/usr/bin/python
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+from gi.repository import GObject
+
+import os
+import sys
+import dbus
+import glib
+import dbus.service
+import dbus.mainloop.glib
+from optparse import OptionParser, make_option
+from socket import SOCK_SEQPACKET, socket
+
+mainloop = None
+audio_supported = True
+
+try:
+	from socket import AF_BLUETOOTH, BTPROTO_SCO
+except:
+	print("WARNING: python compiled without Bluetooth support"
+					" - audio will not be available")
+	audio_supported = False
+
+BUF_SIZE = 1024
+
+BDADDR_ANY = '00:00:00:00:00:00'
+
+HF_NREC			= 0x0001
+HF_3WAY			= 0x0002
+HF_CLI			= 0x0004
+HF_VOICE_RECOGNITION	= 0x0008
+HF_REMOTE_VOL		= 0x0010
+HF_ENHANCED_STATUS	= 0x0020
+HF_ENHANCED_CONTROL	= 0x0040
+HF_CODEC_NEGOTIATION	= 0x0080
+
+AG_3WAY			= 0x0001
+AG_NREC			= 0x0002
+AG_VOICE_RECOGNITION	= 0x0004
+AG_INBAND_RING		= 0x0008
+AG_VOICE_TAG		= 0x0010
+AG_REJECT_CALL		= 0x0020
+AG_ENHANCED_STATUS	= 0x0040
+AG_ENHANCED_CONTROL	= 0x0080
+AG_EXTENDED_RESULT	= 0x0100
+AG_CODEC_NEGOTIATION	= 0x0200
+
+HF_FEATURES = (HF_3WAY | HF_CLI | HF_VOICE_RECOGNITION |
+			HF_REMOTE_VOL | HF_ENHANCED_STATUS |
+			HF_ENHANCED_CONTROL | HF_CODEC_NEGOTIATION)
+
+AVAIL_CODECS = "1,2"
+
+class HfpConnection:
+	slc_complete = False
+	fd = None
+	io_id = 0
+	version = 0
+	features = 0
+	pending = None
+
+	def slc_completed(self):
+		print("SLC establisment complete")
+		self.slc_complete = True
+
+	def slc_next_cmd(self, cmd):
+		if not cmd:
+			self.send_cmd("AT+BRSF=%u" % (HF_FEATURES))
+		elif (cmd.startswith("AT+BRSF")):
+			if (self.features & AG_CODEC_NEGOTIATION and
+					HF_FEATURES & HF_CODEC_NEGOTIATION):
+				self.send_cmd("AT+BAC=%s" % (AVAIL_CODECS))
+			else:
+				self.send_cmd("AT+CIND=?")
+		elif (cmd.startswith("AT+BAC")):
+			self.send_cmd("AT+CIND=?")
+		elif (cmd.startswith("AT+CIND=?")):
+			self.send_cmd("AT+CIND?")
+		elif (cmd.startswith("AT+CIND?")):
+			self.send_cmd("AT+CMER=3,0,0,1")
+		elif (cmd.startswith("AT+CMER=")):
+			if (HF_FEATURES & HF_3WAY and self.features & AG_3WAY):
+				self.send_cmd("AT+CHLD=?")
+			else:
+				self.slc_completed()
+		elif (cmd.startswith("AT+CHLD=?")):
+			self.slc_completed()
+		else:
+			print("Unknown SLC command completed: %s" % (cmd))
+
+	def io_cb(self, fd, cond):
+		buf = os.read(fd, BUF_SIZE)
+		buf = buf.strip()
+
+		print("Received: %s" % (buf))
+
+		if (buf == "OK" or buf == "ERROR"):
+			cmd = self.pending
+			self.pending = None
+
+			if (not self.slc_complete):
+				self.slc_next_cmd(cmd)
+
+			return True
+
+		parts = buf.split(':')
+
+		if (parts[0] == "+BRSF"):
+			self.features = int(parts[1])
+
+		return True
+
+	def send_cmd(self, cmd):
+		if (self.pending):
+			print("ERROR: Another command is pending")
+			return
+
+		print("Sending: %s" % (cmd))
+
+		os.write(self.fd, cmd + "\r\n")
+		self.pending = cmd
+
+	def __init__(self, fd, version, features):
+		self.fd = fd
+		self.version = version
+		self.features = features
+
+		print("Version 0x%04x Features 0x%04x" % (version, features))
+
+		self.io_id = glib.io_add_watch(fd, glib.IO_IN, self.io_cb)
+
+		self.slc_next_cmd(None)
+
+class HfpProfile(dbus.service.Object):
+	sco_socket = None
+	io_id = 0
+	conns = {}
+
+	def sco_cb(self, sock, cond):
+		(sco, peer) = sock.accept()
+		print("New SCO connection from %s" % (peer))
+
+	def init_sco(self, sock):
+		self.sco_socket = sock
+		self.io_id = glib.io_add_watch(sock, glib.IO_IN, self.sco_cb)
+
+	def __init__(self, bus, path, sco):
+		dbus.service.Object.__init__(self, bus, path)
+
+		if sco:
+			self.init_sco(sco)
+
+	@dbus.service.method("org.bluez.Profile1",
+					in_signature="", out_signature="")
+	def Release(self):
+		print("Release")
+		mainloop.quit()
+
+	@dbus.service.method("org.bluez.Profile1",
+					in_signature="", out_signature="")
+	def Cancel(self):
+		print("Cancel")
+
+	@dbus.service.method("org.bluez.Profile1",
+				in_signature="oha{sv}", out_signature="")
+	def NewConnection(self, path, fd, properties):
+		fd = fd.take()
+		version = 0x0105
+		features = 0
+		print("NewConnection(%s, %d)" % (path, fd))
+		for key in properties.keys():
+			if key == "Version":
+				version = properties[key]
+			elif key == "Features":
+				features = properties[key]
+
+		conn = HfpConnection(fd, version, features)
+
+		self.conns[path] = conn
+
+if __name__ == '__main__':
+	dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+
+	bus = dbus.SystemBus()
+
+	manager = dbus.Interface(bus.get_object("org.bluez",
+				"/org/bluez"), "org.bluez.ProfileManager1")
+
+	option_list = [
+			make_option("-p", "--path", action="store",
+					type="string", dest="path",
+					default="/bluez/test/hfp"),
+			make_option("-n", "--name", action="store",
+					type="string", dest="name",
+					default=None),
+			make_option("-C", "--channel", action="store",
+					type="int", dest="channel",
+					default=None),
+			]
+
+	parser = OptionParser(option_list=option_list)
+
+	(options, args) = parser.parse_args()
+
+	mainloop = GObject.MainLoop()
+
+	opts = {
+			"AutoConnect" :	True,
+			"Version" : dbus.UInt16(0x0106),
+			"Features" : dbus.UInt16(HF_FEATURES),
+		}
+
+	if (options.name):
+		opts["Name"] = options.name
+
+	if (options.channel is not None):
+		opts["Channel"] = dbus.UInt16(options.channel)
+
+	if audio_supported:
+		sco = socket(AF_BLUETOOTH, SOCK_SEQPACKET, BTPROTO_SCO)
+		sco.bind(BDADDR_ANY)
+		sco.listen()
+	else:
+		sco = None
+
+	profile = HfpProfile(bus, options.path, sco)
+
+	manager.RegisterProfile(options.path, "hfp-hf", opts)
+
+	print("Profile registered - waiting for connections")
+
+	mainloop.run()