PyBonjour
30 June 2009
Comments
pybonjour to moduł Pythona udostępniający API na usługę Bonjour opracowaną przez Apple (i kompatybilne implementacje jak Avahi pod Linuksem). Za jego pomocą można rejestrować i wykrywać usługi sieciowe w sieci lokalnej i zewnętrznej. PyBonjour napisany jest w czystym Pythonie i zadziała wszędzie tam, gdzie działa cPython. Pod Mac OS X wystarczy zainstalować moduł. Pod MS Windows należy zainstalować "Bonjour for Windows", a pod Linuksem Avahi lub mDNSResponder.
Poniżej przykładowy kod rejestrujący lokalną usługę:
import select
import sys
import pybonjour
name = sys.argv[1]
regtype = sys.argv[2]
port = int(sys.argv[3])
def register_callback(sdRef, flags, errorCode, name, regtype, domain):
if errorCode == pybonjour.kDNSServiceErr_NoError:
print 'Registered service:'
print ' name =', name
print ' regtype =', regtype
print ' domain =', domain
sdRef = pybonjour.DNSServiceRegister(name = name,
regtype = regtype,
port = port,
callBack = register_callback)
try:
try:
while True:
ready = select.select([sdRef], [], [])
if sdRef in ready[0]:
pybonjour.DNSServiceProcessResult(sdRef)
except KeyboardInterrupt:
pass
finally:
sdRef.close()
python skrypt.py MojaUsluga _test._tcp 1234
Kod wykrywający usługi i pobierający o nich dane wygląda następująco:
import select
import sys
import pybonjour
regtype = sys.argv[1]
timeout = 5
resolved = []
def resolve_callback(sdRef, flags, interfaceIndex, errorCode, fullname,
hosttarget, port, txtRecord):
if errorCode == pybonjour.kDNSServiceErr_NoError:
print 'Resolved service:'
print ' fullname =', fullname
print ' hosttarget =', hosttarget
print ' port =', port
resolved.append(True)
def browse_callback(sdRef, flags, interfaceIndex, errorCode, serviceName,
regtype, replyDomain):
if errorCode != pybonjour.kDNSServiceErr_NoError:
return
if not (flags & pybonjour.kDNSServiceFlagsAdd):
print 'Service removed'
return
print 'Service added; resolving'
resolve_sdRef = pybonjour.DNSServiceResolve(0,
interfaceIndex,
serviceName,
regtype,
replyDomain,
resolve_callback)
try:
while not resolved:
ready = select.select([resolve_sdRef], [], [], timeout)
if resolve_sdRef not in ready[0]:
print 'Resolve timed out'
break
pybonjour.DNSServiceProcessResult(resolve_sdRef)
else:
resolved.pop()
finally:
resolve_sdRef.close()
browse_sdRef = pybonjour.DNSServiceBrowse(regtype = regtype,
callBack = browse_callback)
try:
try:
while True:
ready = select.select([browse_sdRef], [], [])
if browse_sdRef in ready[0]:
pybonjour.DNSServiceProcessResult(browse_sdRef)
except KeyboardInterrupt:
pass
finally:
browse_sdRef.close()
python skrypt2.py _test._tcp
Avahi także dostarcza swój moduł Pythona. Prosty przykład dostępny jest na wiki Avahi, lecz dokumentacji brak.
RkBlog
Comment article