diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..3d47f986c41db29ec6dc0d5036bf760b3a1cf366 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.idea/ +target/ +.settings/ +*.iml +.project +.classpath \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..2a9b0e42e47a255dc7b167f7ef716abf7af5d757 --- /dev/null +++ b/pom.xml @@ -0,0 +1,91 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <!-- Inherited Icy Parent POM --> + <parent> + <groupId>org.bioimageanalysis.icy</groupId> + <artifactId>parent-pom-plugin</artifactId> + <version>1.0.3</version> + </parent> + + <!-- Project Information --> + <artifactId>jython-execnet</artifactId> + <version>5.5.2</version> + + <packaging>jar</packaging> + + <name>Jython execnet for Icy</name> + <description> + Enables Python scripts that are run inside Icy to communicate with other Python instances outside Icy. This allows the access to + CPython-only libraries such as Numpy. + </description> + <url>http://icy.bioimageanalysis.org/plugin/jython-execnet-for-icy/</url> + <inceptionYear>2020</inceptionYear> + + <organization> + <name>Institut Pasteur</name> + <url>https://pasteur.fr</url> + </organization> + + <licenses> + <license> + <name>GNU GPLv3</name> + <url>https://www.gnu.org/licenses/gpl-3.0.en.html</url> + <distribution>repo</distribution> + </license> + </licenses> + + <developers> + <developer> + <id>sdallongeville</id> + <name>Stéphane Dallongeville</name> + <url>https://research.pasteur.fr/fr/member/stephane-dallongeville/</url> + <roles> + <role>founder</role> + <role>lead</role> + <role>architect</role> + <role>developer</role> + <role>debugger</role> + <role>tester</role> + <role>maintainer</role> + <role>support</role> + </roles> + </developer> + </developers> + + <!-- Project properties --> + <properties> + + </properties> + + <!-- Project build configuration --> + <build> + </build> + + <!-- List of project's dependencies --> + <dependencies> + <!-- The core of Icy --> + <dependency> + <groupId>org.bioimageanalysis.icy</groupId> + <artifactId>icy-kernel</artifactId> + </dependency> + + <dependency> + <groupId>org.bioimageanalysis.icy</groupId> + <artifactId>jython</artifactId> + <version>2.8.0</version> + </dependency> + </dependencies> + + <!-- Icy Maven repository (to find parent POM) --> + <repositories> + <repository> + <id>icy</id> + <name>Icy's Nexus</name> + <url>https://icy-nexus.pasteur.fr/repository/Icy/</url> + </repository> + </repositories> +</project> \ No newline at end of file diff --git a/src/main/java/plugins/tlecomte/jythonExecnetForIcy/JythonExecnetForIcy.java b/src/main/java/plugins/tlecomte/jythonExecnetForIcy/JythonExecnetForIcy.java new file mode 100644 index 0000000000000000000000000000000000000000..cc5b1555dc1b9ab9278956a73c65ad7b527ee105 --- /dev/null +++ b/src/main/java/plugins/tlecomte/jythonExecnetForIcy/JythonExecnetForIcy.java @@ -0,0 +1,11 @@ +package plugins.tlecomte.jythonExecnetForIcy; + +import icy.plugin.abstract_.Plugin; +import icy.plugin.interface_.PluginLibrary; +import plugins.tlecomte.jythonForIcy.PythonLibraries; + +@PythonLibraries +public class JythonExecnetForIcy extends Plugin implements PluginLibrary +{ + +} diff --git a/src/main/resources/PythonLibs/site-packages/Icy_inspect.py b/src/main/resources/PythonLibs/site-packages/Icy_inspect.py new file mode 100644 index 0000000000000000000000000000000000000000..2ed091bb909d9c194b30833c6cdf82e09b791a77 --- /dev/null +++ b/src/main/resources/PythonLibs/site-packages/Icy_inspect.py @@ -0,0 +1,61 @@ +import linecache +import os.path +from java.io import File +import inspect + +class JavaInspect: + def __init__(self, classloader): + self.classloader = classloader + + # this function extends inspect.getsource(module) to handle the case where + # the module is loaded from Icy classloader instead of being loaded as + # usual by python + # Note: this is possible because Icy classloader has a method to retrieve + # the resource for a given object, that turns out to work great for Python + # files + def getsource(self, module): + try: + return inspect.getsource(module) + except IOError: + moduleName = module.__name__ + + #inspect.getsource could not find the file + #it surely means that the code is not inside a regular .py file loaded from disk by python + #it may be a .py file loaded by a Java classloader + #or even a .py inside a jar + + # go from Python module name to path and file name + moduleName = moduleName.split('.') + moduleName = os.path.join(*moduleName) + moduleName += ".py" + + resourceURL = self.classloader.getResource(moduleName) + if resourceURL <> None: + protocol = resourceURL.getProtocol() + + if protocol == "jar": + jarFile = resourceURL.openConnection().getJarFile() + jarEntry = resourceURL.openConnection().getJarEntry() + inputStream = jarFile.getInputStream(jarEntry) + + from org.python.core.util import FileUtil + f = FileUtil.wrap(inputStream) + return f.read(-1) # read until EOF + elif protocol == "file": + fileName = resourceURL.getFile() + return "".join(linecache.getlines(fileName)) + else: + raise IOError("Cannot load module %s whose resource %s is not a file nor a jar" %(moduleName, resourceURL)) + else: + raise IOError("Cannot load module %s: not found by inspect module, and not found in java classloader" %(moduleName)) + + +# this function extends inspect.getsource(module) to handle the case where +# the module is loaded from Icy classloader instead of being loaded as +# usual by python +# Note: this is possible because Icy classloader has a method to retrieve +# the resource for a given object, that turns out to work great for Python +# files +def getsource(module): + from icy.plugin import PluginLoader + return JavaInspect(PluginLoader.getLoader()).getsource(module) diff --git a/src/main/resources/PythonLibs/site-packages/execnet/__init__.py b/src/main/resources/PythonLibs/site-packages/execnet/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..f579b401dd6746241889f8aa628494f7649eb496 --- /dev/null +++ b/src/main/resources/PythonLibs/site-packages/execnet/__init__.py @@ -0,0 +1,29 @@ +""" +execnet: pure python lib for connecting to local and remote Python Interpreters. + +(c) 2012, Holger Krekel and others +""" +__version__ = '1.2.0.dev1' + +import execnet.apipkg + +execnet.apipkg.initpkg(__name__, { + 'PopenGateway': '.deprecated:PopenGateway', + 'SocketGateway': '.deprecated:SocketGateway', + 'SshGateway': '.deprecated:SshGateway', + 'makegateway': '.multi:makegateway', + 'set_execmodel': '.multi:set_execmodel', + 'HostNotFound': '.gateway_bootstrap:HostNotFound', + 'RemoteError': '.gateway_base:RemoteError', + 'TimeoutError': '.gateway_base:TimeoutError', + 'XSpec': '.xspec:XSpec', + 'Group': '.multi:Group', + 'MultiChannel': '.multi:MultiChannel', + 'RSync': '.rsync:RSync', + 'default_group': '.multi:default_group', + 'dumps': '.gateway_base:dumps', + 'loads': '.gateway_base:loads', + 'load': '.gateway_base:load', + 'dump': '.gateway_base:dump', + 'DataFormatError': '.gateway_base:DataFormatError', +}) diff --git a/src/main/resources/PythonLibs/site-packages/execnet/apipkg.py b/src/main/resources/PythonLibs/site-packages/execnet/apipkg.py new file mode 100644 index 0000000000000000000000000000000000000000..a4576c0b295474b400c2dfa3b45bf8d482a4736e --- /dev/null +++ b/src/main/resources/PythonLibs/site-packages/execnet/apipkg.py @@ -0,0 +1,167 @@ +""" +apipkg: control the exported namespace of a python package. + +see http://pypi.python.org/pypi/apipkg + +(c) holger krekel, 2009 - MIT license +""" +import os +import sys +from types import ModuleType + +__version__ = '1.2' + +def initpkg(pkgname, exportdefs, attr=dict()): + """ initialize given package from the export definitions. """ + oldmod = sys.modules.get(pkgname) + d = {} + f = getattr(oldmod, '__file__', None) + if f: + f = os.path.abspath(f) + d['__file__'] = f + if hasattr(oldmod, '__version__'): + d['__version__'] = oldmod.__version__ + if hasattr(oldmod, '__loader__'): + d['__loader__'] = oldmod.__loader__ + if hasattr(oldmod, '__path__'): + d['__path__'] = [os.path.abspath(p) for p in oldmod.__path__] + if '__doc__' not in exportdefs and getattr(oldmod, '__doc__', None): + d['__doc__'] = oldmod.__doc__ + d.update(attr) + if hasattr(oldmod, "__dict__"): + oldmod.__dict__.update(d) + mod = ApiModule(pkgname, exportdefs, implprefix=pkgname, attr=d) + sys.modules[pkgname] = mod + +def importobj(modpath, attrname): + module = __import__(modpath, None, None, ['__doc__']) + if not attrname: + return module + + retval = module + names = attrname.split(".") + for x in names: + retval = getattr(retval, x) + return retval + +class ApiModule(ModuleType): + def __docget(self): + try: + return self.__doc + except AttributeError: + if '__doc__' in self.__map__: + return self.__makeattr('__doc__') + def __docset(self, value): + self.__doc = value + __doc__ = property(__docget, __docset) + + def __init__(self, name, importspec, implprefix=None, attr=None): + self.__name__ = name + self.__all__ = [x for x in importspec if x != '__onfirstaccess__'] + self.__map__ = {} + self.__implprefix__ = implprefix or name + if attr: + for name, val in attr.items(): + #print "setting", self.__name__, name, val + setattr(self, name, val) + for name, importspec in importspec.items(): + if isinstance(importspec, dict): + subname = '%s.%s'%(self.__name__, name) + apimod = ApiModule(subname, importspec, implprefix) + sys.modules[subname] = apimod + setattr(self, name, apimod) + else: + parts = importspec.split(':') + modpath = parts.pop(0) + attrname = parts and parts[0] or "" + if modpath[0] == '.': + modpath = implprefix + modpath + + if not attrname: + subname = '%s.%s'%(self.__name__, name) + apimod = AliasModule(subname, modpath) + sys.modules[subname] = apimod + if '.' not in name: + setattr(self, name, apimod) + else: + self.__map__[name] = (modpath, attrname) + + def __repr__(self): + l = [] + if hasattr(self, '__version__'): + l.append("version=" + repr(self.__version__)) + if hasattr(self, '__file__'): + l.append('from ' + repr(self.__file__)) + if l: + return '<ApiModule %r %s>' % (self.__name__, " ".join(l)) + return '<ApiModule %r>' % (self.__name__,) + + def __makeattr(self, name): + """lazily compute value for name or raise AttributeError if unknown.""" + #print "makeattr", self.__name__, name + target = None + if '__onfirstaccess__' in self.__map__: + target = self.__map__.pop('__onfirstaccess__') + importobj(*target)() + try: + modpath, attrname = self.__map__[name] + except KeyError: + if target is not None and name != '__onfirstaccess__': + # retry, onfirstaccess might have set attrs + return getattr(self, name) + raise AttributeError(name) + else: + result = importobj(modpath, attrname) + setattr(self, name, result) + try: + del self.__map__[name] + except KeyError: + pass # in a recursive-import situation a double-del can happen + return result + + __getattr__ = __makeattr + + def __dict__(self): + # force all the content of the module to be loaded when __dict__ is read + dictdescr = ModuleType.__dict__['__dict__'] + dict = dictdescr.__get__(self) + if dict is not None: + hasattr(self, 'some') + for name in self.__all__: + try: + self.__makeattr(name) + except AttributeError: + pass + return dict + __dict__ = property(__dict__) + + +def AliasModule(modname, modpath, attrname=None): + mod = [] + + def getmod(): + if not mod: + x = importobj(modpath, None) + if attrname is not None: + x = getattr(x, attrname) + mod.append(x) + return mod[0] + + class AliasModule(ModuleType): + + def __repr__(self): + x = modpath + if attrname: + x += "." + attrname + return '<AliasModule %r for %r>' % (modname, x) + + def __getattribute__(self, name): + return getattr(getmod(), name) + + def __setattr__(self, name, value): + setattr(getmod(), name, value) + + def __delattr__(self, name): + delattr(getmod(), name) + + return AliasModule(modname) diff --git a/src/main/resources/PythonLibs/site-packages/execnet/deprecated.py b/src/main/resources/PythonLibs/site-packages/execnet/deprecated.py new file mode 100644 index 0000000000000000000000000000000000000000..aef4626b338d4410c363f03c718be64f7b4b41a1 --- /dev/null +++ b/src/main/resources/PythonLibs/site-packages/execnet/deprecated.py @@ -0,0 +1,43 @@ +""" +some deprecated calls + +(c) 2008-2009, Holger Krekel and others +""" +import execnet + +def PopenGateway(python=None): + """ instantiate a gateway to a subprocess + started with the given 'python' executable. + """ + APIWARN("1.0.0b4", "use makegateway('popen')") + spec = execnet.XSpec("popen") + spec.python = python + return execnet.default_group.makegateway(spec) + +def SocketGateway(host, port): + """ This Gateway provides interaction with a remote process + by connecting to a specified socket. On the remote + side you need to manually start a small script + (py/execnet/script/socketserver.py) that accepts + SocketGateway connections or use the experimental + new_remote() method on existing gateways. + """ + APIWARN("1.0.0b4", "use makegateway('socket=host:port')") + spec = execnet.XSpec("socket=%s:%s" %(host, port)) + return execnet.default_group.makegateway(spec) + +def SshGateway(sshaddress, remotepython=None, ssh_config=None): + """ instantiate a remote ssh process with the + given 'sshaddress' and remotepython version. + you may specify an ssh_config file. + """ + APIWARN("1.0.0b4", "use makegateway('ssh=host')") + spec = execnet.XSpec("ssh=%s" % sshaddress) + spec.python = remotepython + spec.ssh_config = ssh_config + return execnet.default_group.makegateway(spec) + +def APIWARN(version, msg, stacklevel=3): + import warnings + Warn = DeprecationWarning("(since version %s) %s" %(version, msg)) + warnings.warn(Warn, stacklevel=stacklevel) diff --git a/src/main/resources/PythonLibs/site-packages/execnet/gateway.py b/src/main/resources/PythonLibs/site-packages/execnet/gateway.py new file mode 100644 index 0000000000000000000000000000000000000000..7ecf07741159e79148f1d38459958053d01fbc48 --- /dev/null +++ b/src/main/resources/PythonLibs/site-packages/execnet/gateway.py @@ -0,0 +1,202 @@ +""" +gateway code for initiating popen, socket and ssh connections. +(c) 2004-2013, Holger Krekel and others +""" + +import sys, os, inspect, types, linecache +import textwrap +import execnet +from execnet.gateway_base import Message +from execnet import gateway_base +importdir = os.path.dirname(os.path.dirname(execnet.__file__)) + +class Gateway(gateway_base.BaseGateway): + """ Gateway to a local or remote Python Intepreter. """ + + def __init__(self, io, spec): + super(Gateway, self).__init__(io=io, id=spec.id, _startcount=1) + self.spec = spec + self._initreceive() + + @property + def remoteaddress(self): + return self._io.remoteaddress + + def __repr__(self): + """ return string representing gateway type and status. """ + try: + r = (self.hasreceiver() and 'receive-live' or 'not-receiving') + i = len(self._channelfactory.channels()) + except AttributeError: + r = "uninitialized" + i = "no" + return "<%s id=%r %s, %s model, %s active channels>" %( + self.__class__.__name__, self.id, r, self.execmodel.backend, i) + + def exit(self): + """ trigger gateway exit. Defer waiting for finishing + of receiver-thread and subprocess activity to when + group.terminate() is called. + """ + self._trace("gateway.exit() called") + if self not in self._group: + self._trace("gateway already unregistered with group") + return + self._group._unregister(self) + try: + self._trace("--> sending GATEWAY_TERMINATE") + self._send(Message.GATEWAY_TERMINATE) + self._trace("--> io.close_write") + self._io.close_write() + except IOError: + v = sys.exc_info()[1] + self._trace("io-error: could not send termination sequence") + self._trace(" exception: %r" % v) + + def reconfigure(self, py2str_as_py3str=True, py3str_as_py2str=False): + """ + set the string coercion for this gateway + the default is to try to convert py2 str as py3 str, + but not to try and convert py3 str to py2 str + """ + self._strconfig = (py2str_as_py3str, py3str_as_py2str) + data = gateway_base.dumps_internal(self._strconfig) + self._send(Message.RECONFIGURE, data=data) + + + def _rinfo(self, update=False): + """ return some sys/env information from remote. """ + if update or not hasattr(self, '_cache_rinfo'): + ch = self.remote_exec(rinfo_source) + self._cache_rinfo = RInfo(ch.receive()) + return self._cache_rinfo + + def hasreceiver(self): + """ return True if gateway is able to receive data. """ + return self._receiverthread.running # approxmimation + + def remote_status(self): + """ return information object about remote execution status. """ + channel = self.newchannel() + self._send(Message.STATUS, channel.id) + statusdict = channel.receive() + # the other side didn't actually instantiate a channel + # so we just delete the internal id/channel mapping + self._channelfactory._local_close(channel.id) + return RemoteStatus(statusdict) + + def remote_exec(self, source, **kwargs): + """ return channel object and connect it to a remote + execution thread where the given ``source`` executes. + + * ``source`` is a string: execute source string remotely + with a ``channel`` put into the global namespace. + * ``source`` is a pure function: serialize source and + call function with ``**kwargs``, adding a + ``channel`` object to the keyword arguments. + * ``source`` is a pure module: execute source of module + with a ``channel`` in its global namespace + + In all cases the binding ``__name__='__channelexec__'`` + will be available in the global namespace of the remotely + executing code. + """ + call_name = None + if isinstance(source, types.ModuleType): + linecache.updatecache(inspect.getsourcefile(source)) + source = inspect.getsource(source) + elif isinstance(source, types.FunctionType): + call_name = source.__name__ + source = _source_of_function(source) + else: + source = textwrap.dedent(str(source)) + + if call_name is None and kwargs: + raise TypeError("can't pass kwargs to non-function remote_exec") + + channel = self.newchannel() + self._send(Message.CHANNEL_EXEC, + channel.id, + gateway_base.dumps_internal((source, call_name, kwargs))) + return channel + + def remote_init_threads(self, num=None): + """ DEPRECATED. Is currently a NO-OPERATION already.""" + print ("WARNING: remote_init_threads() is a no-operation in execnet-1.2") + +class RInfo: + def __init__(self, kwargs): + self.__dict__.update(kwargs) + + def __repr__(self): + info = ", ".join(["%s=%s" % item + for item in self.__dict__.items()]) + return "<RInfo %r>" % info + +RemoteStatus = RInfo + +def rinfo_source(channel): + import sys, os + channel.send(dict( + executable = sys.executable, + version_info = sys.version_info[:5], + platform = sys.platform, + cwd = os.getcwd(), + pid = os.getpid(), + )) + + +def _find_non_builtin_globals(source, codeobj): + try: + import ast + except ImportError: + return None + try: + import __builtin__ + except ImportError: + import builtins as __builtin__ + + vars = dict.fromkeys(codeobj.co_varnames) + all = [] + for node in ast.walk(ast.parse(source)): + if (isinstance(node, ast.Name) and node.id not in vars and + node.id not in __builtin__.__dict__): + all.append(node.id) + return all + + +def _source_of_function(function): + if function.__name__ == '<lambda>': + raise ValueError("can't evaluate lambda functions'") + #XXX: we dont check before remote instanciation + # if arguments are used propperly + args, varargs, keywords, defaults = inspect.getargspec(function) + if args[0] != 'channel': + raise ValueError('expected first function argument to be `channel`') + + if sys.version_info < (3,0): + closure = function.func_closure + codeobj = function.func_code + else: + closure = function.__closure__ + codeobj = function.__code__ + + if closure is not None: + raise ValueError("functions with closures can't be passed") + + try: + source = inspect.getsource(function) + except IOError: + raise ValueError("can't find source file for %s" % function) + + source = textwrap.dedent(source) # just for inner functions + + used_globals = _find_non_builtin_globals(source, codeobj) + if used_globals: + raise ValueError( + "the use of non-builtin globals isn't supported", + used_globals, + ) + + return source + diff --git a/src/main/resources/PythonLibs/site-packages/execnet/gateway_base.py b/src/main/resources/PythonLibs/site-packages/execnet/gateway_base.py new file mode 100644 index 0000000000000000000000000000000000000000..6f4229241d93ef102ee3e9babe17b8fbae5856b3 --- /dev/null +++ b/src/main/resources/PythonLibs/site-packages/execnet/gateway_base.py @@ -0,0 +1,1473 @@ +""" +base execnet gateway code send to the other side for bootstrapping. + +NOTE: aims to be compatible to Python 2.5-3.X, Jython and IronPython + +(C) 2004-2013 Holger Krekel, Armin Rigo, Benjamin Peterson, Ronny Pfannschmidt and others +""" +from __future__ import with_statement +import sys, os, weakref +import traceback, struct + +# NOTE that we want to avoid try/except style importing +# to avoid setting sys.exc_info() during import +# + +ISPY3 = sys.version_info >= (3, 0) +if ISPY3: + from io import BytesIO + exec("def do_exec(co, loc): exec(co, loc)\n" + "def reraise(cls, val, tb): raise val\n") + unicode = str + _long_type = int + from _thread import interrupt_main +else: + from StringIO import StringIO as BytesIO + exec("def do_exec(co, loc): exec co in loc\n" + "def reraise(cls, val, tb): raise cls, val, tb\n") + bytes = str + _long_type = long + try: + from thread import interrupt_main + except ImportError: + interrupt_main = None + +class EmptySemaphore: + acquire = release = lambda self: None + +def get_execmodel(backend): + if hasattr(backend, "backend"): + return backend + if backend == "thread": + importdef = { + 'get_ident': ['thread::get_ident', '_thread::get_ident'], + '_start_new_thread': ['thread::start_new_thread', + '_thread::start_new_thread'], + 'threading': ["threading",], + 'queue': ["queue", "Queue"], + 'sleep': ['time::sleep'], + 'subprocess': ['subprocess'], + 'socket': ['socket'], + '_fdopen': ['os::fdopen'], + '_lock': ['threading'], + '_event': ['threading'], + } + def exec_start(self, func, args=()): + self._start_new_thread(func, args) + + elif backend == "eventlet": + importdef = { + 'get_ident': ['eventlet.green.thread::get_ident'], + '_spawn_n': ['eventlet::spawn_n'], + 'threading': ['eventlet.green.threading'], + 'queue': ["eventlet.queue"], + 'sleep': ['eventlet::sleep'], + 'subprocess': ['eventlet.green.subprocess'], + 'socket': ['eventlet.green.socket'], + '_fdopen': ['eventlet.green.os::fdopen'], + '_lock': ['eventlet.green.threading'], + '_event': ['eventlet.green.threading'], + } + def exec_start(self, func, args=()): + self._spawn_n(func, *args) + elif backend == "gevent": + importdef = { + 'get_ident': ['gevent.thread::get_ident'], + '_spawn_n': ['gevent::spawn'], + 'threading': ['threading'], + 'queue': ["gevent.queue"], + 'sleep': ['gevent::sleep'], + 'subprocess': ['gevent.subprocess'], + 'socket': ['gevent.socket'], + # XXX + '_fdopen': ['gevent.fileobject::FileObjectThread'], + '_lock': ['gevent.lock'], + '_event': ['gevent.event'], + } + def exec_start(self, func, args=()): + self._spawn_n(func, *args) + else: + raise ValueError("unknown execmodel %r" %(backend,)) + + class ExecModel: + def __init__(self, name): + self._importdef = importdef + self.backend = name + self._count = 0 + + def __repr__(self): + return "<ExecModel %r>" % self.backend + + def __getattr__(self, name): + locs = self._importdef.get(name) + if locs is None: + raise AttributeError(name) + for loc in locs: + parts = loc.split("::") + loc = parts.pop(0) + try: + mod = __import__(loc, None, None, "__doc__") + except ImportError: + pass + else: + if parts: + mod = getattr(mod, parts[0]) + setattr(self, name, mod) + return mod + raise AttributeError(name) + + start = exec_start + + def fdopen(self, fd, mode, bufsize=1): + return self._fdopen(fd, mode, bufsize) + + def WorkerPool(self, size=None, hasprimary=False): + return WorkerPool(self, size, hasprimary=hasprimary) + + def Semaphore(self, size=None): + if size is None: + return EmptySemaphore() + return self._lock.Semaphore(size) + + def Lock(self): + return self._lock.RLock() + + def RLock(self): + return self._lock.RLock() + + def Event(self): + event = self._event.Event() + if sys.version_info < (2,7): + # patch wait function to return event state instead of None + real_wait = event.wait + def wait(timeout=None): + real_wait(timeout=timeout) + return event.isSet() + event.wait = wait + return event + + def PopenPiped(self, args): + PIPE = self.subprocess.PIPE + return self.subprocess.Popen(args, stdout=PIPE, stdin=PIPE) + + + return ExecModel(backend) + + +class Reply(object): + """ reply instances provide access to the result + of a function execution that got dispatched + through WorkerPool.spawn() + """ + def __init__(self, task, threadmodel): + self.task = task + self._result_ready = threadmodel.Event() + self.running = True + + def get(self, timeout=None): + """ get the result object from an asynchronous function execution. + if the function execution raised an exception, + then calling get() will reraise that exception + including its traceback. + """ + self.waitfinish(timeout) + try: + return self._result + except AttributeError: + reraise(*(self._excinfo[:3])) # noqa + + def waitfinish(self, timeout=None): + if not self._result_ready.wait(timeout): + raise IOError("timeout waiting for %r" %(self.task, )) + + def run(self): + func, args, kwargs = self.task + try: + try: + self._result = func(*args, **kwargs) + except: + self._excinfo = sys.exc_info() + finally: + self._result_ready.set() + self.running = False + + +class WorkerPool(object): + """ A WorkerPool allows to spawn function executions + to threads, returning a reply object on which you + can ask for the result (and get exceptions reraised) + """ + def __init__(self, execmodel, size=None, hasprimary=False): + """ by default allow unlimited number of spawns. """ + self.execmodel = execmodel + self._size = size + self._running_lock = self.execmodel.Lock() + self._sem = self.execmodel.Semaphore(size) + self._running = set() + self._shutdown_event = self.execmodel.Event() + if hasprimary: + if self.execmodel.backend != "thread": + raise ValueError("hasprimary=True requires thread model") + self._primary_thread_event = self.execmodel.Event() + + def integrate_as_primary_thread(self): + """ integrate the thread with which we are called as a primary + thread to dispatch to when spawn is called. + """ + assert self.execmodel.backend == "thread", self.execmodel + # XXX insert check if we really are in the main thread + primary_thread_event = self._primary_thread_event + # interacts with code at REF1 + while not self._shutdown_event.isSet(): + primary_thread_event.wait() + func, args, kwargs = self._primary_thread_task + if func is None: # waitall() woke us up to finish the loop + break + func(*args, **kwargs) + primary_thread_event.clear() + + def shutdown(self): + self._shutdown_event.set() + + def wait_for_shutdown(self, timeout=None): + return self._shutdown_event.wait(timeout=timeout) + + def active_count(self): + return len(self._running) + + def spawn(self, func, *args, **kwargs): + """ return Reply object for the asynchronous dispatch + of the given func(*args, **kwargs). + """ + reply = Reply((func, args, kwargs), self.execmodel) + def run_and_release(): + reply.run() + with self._running_lock: + self._running.remove(reply) + self._sem.release() + if not self._running: + try: + self._waitall_event.set() + except AttributeError: + pass + self._sem.acquire() + with self._running_lock: + self._running.add(reply) + # REF1 in 'thread' model we give priority to running in main thread + primary_thread_event = getattr(self, "_primary_thread_event", None) + if primary_thread_event is not None: + if not primary_thread_event.isSet(): + self._primary_thread_task = run_and_release, (), {} + primary_thread_event.set() + return reply + self.execmodel.start(run_and_release, ()) + return reply + + def waitall(self, timeout=None): + """ wait until all previosuly spawns have terminated. """ + # if it exists signal primary_thread to finish its loop + self._primary_thread_task = None, None, None + try: + self._primary_thread_event.set() + except AttributeError: + pass + with self._running_lock: + if not self._running: + return True + # if a Reply still runs, we let run_and_release + # signal us -- note that we are still holding the + # _running_lock to avoid race conditions + self._waitall_event = self.execmodel.Event() + return self._waitall_event.wait(timeout=timeout) + + +sysex = (KeyboardInterrupt, SystemExit) + + +DEBUG = os.environ.get('EXECNET_DEBUG') +pid = os.getpid() +if DEBUG == '2': + def trace(*msg): + try: + line = " ".join(map(str, msg)) + sys.stderr.write("[%s] %s\n" % (pid, line)) + sys.stderr.flush() + except Exception: + pass # nothing we can do, likely interpreter-shutdown +elif DEBUG: + import tempfile, os.path + fn = os.path.join(tempfile.gettempdir(), 'execnet-debug-%d' % pid) + #sys.stderr.write("execnet-debug at %r" %(fn,)) + debugfile = open(fn, 'w') + def trace(*msg): + try: + line = " ".join(map(str, msg)) + debugfile.write(line + "\n") + debugfile.flush() + except Exception: + try: + v = sys.exc_info()[1] + sys.stderr.write( + "[%s] exception during tracing: %r\n" % (pid, v)) + except Exception: + pass # nothing we can do, likely interpreter-shutdown +else: + notrace = trace = lambda *msg: None + +class Popen2IO: + error = (IOError, OSError, EOFError) + + def __init__(self, outfile, infile, execmodel): + # we need raw byte streams + self.outfile, self.infile = outfile, infile + if sys.platform == "win32": + import msvcrt + try: + msvcrt.setmode(infile.fileno(), os.O_BINARY) + msvcrt.setmode(outfile.fileno(), os.O_BINARY) + except (AttributeError, IOError): + pass + self._read = getattr(infile, "buffer", infile).read + self._write = getattr(outfile, "buffer", outfile).write + self.execmodel = execmodel + + def read(self, numbytes): + """Read exactly 'numbytes' bytes from the pipe. """ + # a file in non-blocking mode may return less bytes, so we loop + buf = bytes() + while numbytes > len(buf): + data = self._read(numbytes-len(buf)) + if not data: + raise EOFError("expected %d bytes, got %d" %(numbytes, len(buf))) + buf += data + return buf + + def write(self, data): + """write out all data bytes. """ + assert isinstance(data, bytes) + self._write(data) + self.outfile.flush() + + def close_read(self): + self.infile.close() + + def close_write(self): + self.outfile.close() + +class Message: + """ encapsulates Messages and their wire protocol. """ + _types = [] + + def __init__(self, msgcode, channelid=0, data=''): + self.msgcode = msgcode + self.channelid = channelid + self.data = data + + @staticmethod + def from_io(io): + try: + header = io.read(9) # type 1, channel 4, payload 4 + if not header: + raise EOFError("empty read") + except EOFError: + e = sys.exc_info()[1] + raise EOFError('couldnt load message header, ' + e.args[0]) + msgtype, channel, payload = struct.unpack('!bii', header) + return Message(msgtype, channel, io.read(payload)) + + def to_io(self, io): + header = struct.pack('!bii', self.msgcode, self.channelid, + len(self.data)) + io.write(header+self.data) + + def received(self, gateway): + self._types[self.msgcode](self, gateway) + + def __repr__(self): + class FakeChannel(object): + _strconfig = False, False # never transform, never fail + def __init__(self, id): + self.id = id + def __repr__(self): + return '<Channel %s>' % self.id + FakeChannel.new = FakeChannel + FakeChannel.gateway = FakeChannel + name = self._types[self.msgcode].__name__.upper() + try: + data = loads_internal(self.data, FakeChannel) + except LoadError: + data = self.data + r = repr(data) + if len(r) > 90: + return "<Message.%s channelid=%d len=%d>" %(name, + self.channelid, len(r)) + else: + return "<Message.%s channelid=%d %s>" %(name, + self.channelid, r) + +def _setupmessages(): + def status(message, gateway): + # we use the channelid to send back information + # but don't instantiate a channel object + d = {'numchannels': len(gateway._channelfactory._channels), + 'numexecuting': gateway._execpool.active_count(), + 'execmodel': gateway.execmodel.backend, + } + gateway._send(Message.CHANNEL_DATA, message.channelid, dumps_internal(d)) + gateway._send(Message.CHANNEL_CLOSE, message.channelid) + + def channel_exec(message, gateway): + channel = gateway._channelfactory.new(message.channelid) + gateway._local_schedulexec(channel=channel,sourcetask=message.data) + + def channel_data(message, gateway): + gateway._channelfactory._local_receive(message.channelid, message.data) + + def channel_close(message, gateway): + gateway._channelfactory._local_close(message.channelid) + + def channel_close_error(message, gateway): + remote_error = RemoteError(loads_internal(message.data)) + gateway._channelfactory._local_close(message.channelid, remote_error) + + def channel_last_message(message, gateway): + gateway._channelfactory._local_close(message.channelid, sendonly=True) + + def gateway_terminate(message, gateway): + # wake up and terminate any execution waiting to receive something + gateway._channelfactory._finished_receiving() + # then try harder to terminate execution + gateway._terminate_execution() + + def reconfigure(message, gateway): + if message.channelid == 0: + target = gateway + else: + target = gateway._channelfactory.new(message.channelid) + target._strconfig = loads_internal(message.data, gateway) + + types = [ + status, reconfigure, gateway_terminate, + channel_exec, channel_data, channel_close, + channel_close_error, channel_last_message, + ] + for i, handler in enumerate(types): + Message._types.append(handler) + setattr(Message, handler.__name__.upper(), i) + +_setupmessages() + +def geterrortext(excinfo, + format_exception=traceback.format_exception, sysex=sysex): + try: + l = format_exception(*excinfo) + errortext = "".join(l) + except sysex: + raise + except: + errortext = '%s: %s' % (excinfo[0].__name__, + excinfo[1]) + return errortext + +class RemoteError(Exception): + """ Exception containing a stringified error from the other side. """ + def __init__(self, formatted): + self.formatted = formatted + Exception.__init__(self) + + def __str__(self): + return self.formatted + + def __repr__(self): + return "%s: %s" %(self.__class__.__name__, self.formatted) + + def warn(self): + if self.formatted != INTERRUPT_TEXT: + # XXX do this better + sys.stderr.write("[%s] Warning: unhandled %r\n" + % (os.getpid(), self,)) + +class TimeoutError(IOError): + """ Exception indicating that a timeout was reached. """ + + +NO_ENDMARKER_WANTED = object() + +class Channel(object): + """Communication channel between two Python Interpreter execution points.""" + RemoteError = RemoteError + TimeoutError = TimeoutError + _INTERNALWAKEUP = 1000 + _executing = False + + def __init__(self, gateway, id): + assert isinstance(id, int) + self.gateway = gateway + #XXX: defaults copied from Unserializer + self._strconfig = getattr(gateway, '_strconfig', (True, False)) + self.id = id + self._items = self.gateway.execmodel.queue.Queue() + self._closed = False + self._receiveclosed = self.gateway.execmodel.Event() + self._remoteerrors = [] + + def _trace(self, *msg): + self.gateway._trace(self.id, *msg) + + def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED): + """ set a callback function for receiving items. + + All already queued items will immediately trigger the callback. + Afterwards the callback will execute in the receiver thread + for each received data item and calls to ``receive()`` will + raise an error. + If an endmarker is specified the callback will eventually + be called with the endmarker when the channel closes. + """ + _callbacks = self.gateway._channelfactory._callbacks + _receivelock = self.gateway._receivelock + _receivelock.acquire() + try: + if self._items is None: + raise IOError("%r has callback already registered" %(self,)) + items = self._items + self._items = None + while 1: + try: + olditem = items.get(block=False) + except self.gateway.execmodel.queue.Empty: + if not (self._closed or self._receiveclosed.isSet()): + _callbacks[self.id] = ( + callback, + endmarker, + self._strconfig, + ) + break + else: + if olditem is ENDMARKER: + items.put(olditem) # for other receivers + if endmarker is not NO_ENDMARKER_WANTED: + callback(endmarker) + break + else: + callback(olditem) + finally: + _receivelock.release() + + def __repr__(self): + flag = self.isclosed() and "closed" or "open" + return "<Channel id=%d %s>" % (self.id, flag) + + def __del__(self): + if self.gateway is None: # can be None in tests + return + self._trace("channel.__del__") + # no multithreading issues here, because we have the last ref to 'self' + if self._closed: + # state transition "closed" --> "deleted" + for error in self._remoteerrors: + error.warn() + elif self._receiveclosed.isSet(): + # state transition "sendonly" --> "deleted" + # the remote channel is already in "deleted" state, nothing to do + pass + else: + # state transition "opened" --> "deleted" + if self._items is None: # has_callback + msgcode = Message.CHANNEL_LAST_MESSAGE + else: + msgcode = Message.CHANNEL_CLOSE + try: + self.gateway._send(msgcode, self.id) + except (IOError, ValueError): # ignore problems with sending + pass + + def _getremoteerror(self): + try: + return self._remoteerrors.pop(0) + except IndexError: + try: + return self.gateway._error + except AttributeError: + pass + return None + + # + # public API for channel objects + # + def isclosed(self): + """ return True if the channel is closed. A closed + channel may still hold items. + """ + return self._closed + + def makefile(self, mode='w', proxyclose=False): + """ return a file-like object. + mode can be 'w' or 'r' for writeable/readable files. + if proxyclose is true file.close() will also close the channel. + """ + if mode == "w": + return ChannelFileWrite(channel=self, proxyclose=proxyclose) + elif mode == "r": + return ChannelFileRead(channel=self, proxyclose=proxyclose) + raise ValueError("mode %r not availabe" %(mode,)) + + def close(self, error=None): + """ close down this channel with an optional error message. + Note that closing of a channel tied to remote_exec happens + automatically at the end of execution and cannot + be done explicitely. + """ + if self._executing: + raise IOError("cannot explicitly close channel within remote_exec") + if self._closed: + self.gateway._trace(self, "ignoring redundant call to close()") + if not self._closed: + # state transition "opened/sendonly" --> "closed" + # threads warning: the channel might be closed under our feet, + # but it's never damaging to send too many CHANNEL_CLOSE messages + # however, if the other side triggered a close already, we + # do not send back a closed message. + if not self._receiveclosed.isSet(): + put = self.gateway._send + if error is not None: + put(Message.CHANNEL_CLOSE_ERROR, self.id, dumps_internal(error)) + else: + put(Message.CHANNEL_CLOSE, self.id) + self._trace("sent channel close message") + if isinstance(error, RemoteError): + self._remoteerrors.append(error) + self._closed = True # --> "closed" + self._receiveclosed.set() + queue = self._items + if queue is not None: + queue.put(ENDMARKER) + self.gateway._channelfactory._no_longer_opened(self.id) + + def waitclose(self, timeout=None): + """ wait until this channel is closed (or the remote side + otherwise signalled that no more data was being sent). + The channel may still hold receiveable items, but not receive + any more after waitclose() has returned. Exceptions from executing + code on the other side are reraised as local channel.RemoteErrors. + EOFError is raised if the reading-connection was prematurely closed, + which often indicates a dying process. + self.TimeoutError is raised after the specified number of seconds + (default is None, i.e. wait indefinitely). + """ + self._receiveclosed.wait(timeout=timeout) # wait for non-"opened" state + if not self._receiveclosed.isSet(): + raise self.TimeoutError("Timeout after %r seconds" % timeout) + error = self._getremoteerror() + if error: + raise error + + def send(self, item): + """sends the given item to the other side of the channel, + possibly blocking if the sender queue is full. + The item must be a simple python type and will be + copied to the other side by value. IOError is + raised if the write pipe was prematurely closed. + """ + if self.isclosed(): + raise IOError("cannot send to %r" %(self,)) + self.gateway._send(Message.CHANNEL_DATA, self.id, dumps_internal(item)) + + def receive(self, timeout=None): + """receive a data item that was sent from the other side. + timeout: None [default] blocked waiting. A positive number + indicates the number of seconds after which a channel.TimeoutError + exception will be raised if no item was received. + Note that exceptions from the remotely executing code will be + reraised as channel.RemoteError exceptions containing + a textual representation of the remote traceback. + """ + itemqueue = self._items + if itemqueue is None: + raise IOError("cannot receive(), channel has receiver callback") + try: + x = itemqueue.get(timeout=timeout) + except self.gateway.execmodel.queue.Empty: + raise self.TimeoutError("no item after %r seconds" %(timeout)) + if x is ENDMARKER: + itemqueue.put(x) # for other receivers + raise self._getremoteerror() or EOFError() + else: + return x + + def __iter__(self): + return self + + def next(self): + try: + return self.receive() + except EOFError: + raise StopIteration + __next__ = next + + + def reconfigure(self, py2str_as_py3str=True, py3str_as_py2str=False): + """ + set the string coercion for this channel + the default is to try to convert py2 str as py3 str, + but not to try and convert py3 str to py2 str + """ + self._strconfig = (py2str_as_py3str, py3str_as_py2str) + data = dumps_internal(self._strconfig) + self.gateway._send(Message.RECONFIGURE, self.id, data=data) + +ENDMARKER = object() +INTERRUPT_TEXT = "keyboard-interrupted" + +class ChannelFactory(object): + def __init__(self, gateway, startcount=1): + self._channels = weakref.WeakValueDictionary() + self._callbacks = {} + self._writelock = gateway.execmodel.Lock() + self.gateway = gateway + self.count = startcount + self.finished = False + self._list = list # needed during interp-shutdown + + def new(self, id=None): + """ create a new Channel with 'id' (or create new id if None). """ + self._writelock.acquire() + try: + if self.finished: + raise IOError("connexion already closed: %s" % (self.gateway,)) + if id is None: + id = self.count + self.count += 2 + try: + channel = self._channels[id] + except KeyError: + channel = self._channels[id] = Channel(self.gateway, id) + return channel + finally: + self._writelock.release() + + def channels(self): + return self._list(self._channels.values()) + + # + # internal methods, called from the receiver thread + # + def _no_longer_opened(self, id): + try: + del self._channels[id] + except KeyError: + pass + try: + callback, endmarker, strconfig = self._callbacks.pop(id) + except KeyError: + pass + else: + if endmarker is not NO_ENDMARKER_WANTED: + callback(endmarker) + + def _local_close(self, id, remoteerror=None, sendonly=False): + channel = self._channels.get(id) + if channel is None: + # channel already in "deleted" state + if remoteerror: + remoteerror.warn() + self._no_longer_opened(id) + else: + # state transition to "closed" state + if remoteerror: + channel._remoteerrors.append(remoteerror) + queue = channel._items + if queue is not None: + queue.put(ENDMARKER) + self._no_longer_opened(id) + if not sendonly: # otherwise #--> "sendonly" + channel._closed = True # --> "closed" + channel._receiveclosed.set() + + def _local_receive(self, id, data): + # executes in receiver thread + try: + callback, endmarker, strconfig= self._callbacks[id] + channel = self._channels.get(id) + except KeyError: + channel = self._channels.get(id) + queue = channel and channel._items + if queue is None: + pass # drop data + else: + item = loads_internal(data, channel) + queue.put(item) + else: + try: + data = loads_internal(data, channel, strconfig) + callback(data) # even if channel may be already closed + except KeyboardInterrupt: + raise + except: + excinfo = sys.exc_info() + self.gateway._trace("exception during callback: %s" % excinfo[1]) + errortext = self.gateway._geterrortext(excinfo) + self.gateway._send(Message.CHANNEL_CLOSE_ERROR, id, dumps_internal(errortext)) + self._local_close(id, errortext) + + def _finished_receiving(self): + self.gateway._trace("finished receiving") + self._writelock.acquire() + try: + self.finished = True + finally: + self._writelock.release() + for id in self._list(self._channels): + self._local_close(id, sendonly=True) + for id in self._list(self._callbacks): + self._no_longer_opened(id) + +class ChannelFile(object): + def __init__(self, channel, proxyclose=True): + self.channel = channel + self._proxyclose = proxyclose + + def isatty(self): + return False + + def close(self): + if self._proxyclose: + self.channel.close() + + def __repr__(self): + state = self.channel.isclosed() and 'closed' or 'open' + return '<ChannelFile %d %s>' %(self.channel.id, state) + +class ChannelFileWrite(ChannelFile): + def write(self, out): + self.channel.send(out) + + def flush(self): + pass + +class ChannelFileRead(ChannelFile): + def __init__(self, channel, proxyclose=True): + super(ChannelFileRead, self).__init__(channel, proxyclose) + self._buffer = None + + def read(self, n): + try: + if self._buffer is None: + self._buffer = self.channel.receive() + while len(self._buffer) < n: + self._buffer += self.channel.receive() + except EOFError: + self.close() + if self._buffer is None: + ret = "" + else: + ret = self._buffer[:n] + self._buffer = self._buffer[n:] + return ret + + def readline(self): + if self._buffer is not None: + i = self._buffer.find("\n") + if i != -1: + return self.read(i+1) + line = self.read(len(self._buffer)+1) + else: + line = self.read(1) + while line and line[-1] != "\n": + c = self.read(1) + if not c: + break + line += c + return line + +class BaseGateway(object): + exc_info = sys.exc_info + _sysex = sysex + id = "<slave>" + + def __init__(self, io, id, _startcount=2): + self.execmodel = io.execmodel + self._io = io + self.id = id + self._strconfig = (Unserializer.py2str_as_py3str, + Unserializer.py3str_as_py2str) + self._channelfactory = ChannelFactory(self, _startcount) + self._receivelock = self.execmodel.RLock() + # globals may be NONE at process-termination + self.__trace = trace + self._geterrortext = geterrortext + self._workerpool = self.execmodel.WorkerPool(1) + + def _trace(self, *msg): + self.__trace(self.id, *msg) + + def _initreceive(self): + self._receiverthread = self._workerpool.spawn(self._thread_receiver) + + def _thread_receiver(self): + def log(*msg): + self._trace("[receiver-thread]", *msg) + log("RECEIVERTHREAD: starting to run") + eof = False + io = self._io + try: + try: + while 1: + msg = Message.from_io(io) + log("received", msg) + _receivelock = self._receivelock + _receivelock.acquire() + try: + msg.received(self) + del msg + finally: + _receivelock.release() + except self._sysex: + log("io.close_read()") + self._io.close_read() + except EOFError: + log("got EOF") + #log("traceback was: ", self._geterrortext(self.exc_info())) + self._error = self.exc_info()[1] + eof = True + except: + log(self._geterrortext(self.exc_info())) + finally: + try: + log('entering finalization') + # wake up any execution waiting to receive something + self._channelfactory._finished_receiving() + if eof: + self._terminate_execution() + log('leaving finalization') + except: + pass # XXX be silent at interp-shutdown + + def _terminate_execution(self): + pass + + def _send(self, msgcode, channelid=0, data=bytes()): + message = Message(msgcode, channelid, data) + try: + message.to_io(self._io) + self._trace('sent', message) + except (IOError, ValueError): + e = sys.exc_info()[1] + self._trace('failed to send', message, e) + raise + + + def _local_schedulexec(self, channel, sourcetask): + channel.close("execution disallowed") + + # _____________________________________________________________________ + # + # High Level Interface + # _____________________________________________________________________ + # + def newchannel(self): + """ return a new independent channel. """ + return self._channelfactory.new() + + def join(self, timeout=None): + """ Wait for receiverthread to terminate. """ + self._trace("waiting for receiver thread to finish") + self._receiverthread.waitfinish() + +class SlaveGateway(BaseGateway): + + def _local_schedulexec(self, channel, sourcetask): + sourcetask = loads_internal(sourcetask) + self._execpool.spawn(self.executetask, ((channel, sourcetask))) + + def _terminate_execution(self): + # called from receiverthread + self._trace("shutting down execution pool") + self._execpool.shutdown() + if not self._execpool.waitall(1.0): + self._trace("execution ongoing, trying interrupt_main") + # We try hard to terminate execution based on the assumption + # that there is only one gateway object running per-process. + if sys.platform != "win32": + self._trace("sending ourselves a SIGINT") + os.kill(os.getpid(), 2) # send ourselves a SIGINT + elif interrupt_main is not None: + self._trace("calling interrupt_main()") + interrupt_main() + if not self._execpool.waitall(10.0): + self._trace("execution did not finish in 10 secs, " + "calling os._exit()") + os._exit(1) + + def serve(self): + trace = lambda msg: self._trace("[serve] " + msg) + hasprimary = self.execmodel.backend == "thread" + self._execpool = self.execmodel.WorkerPool(hasprimary=hasprimary) + trace("spawning receiver thread") + self._initreceive() + try: + try: + if hasprimary: + trace("integrating as main primary exec thread") + self._execpool.integrate_as_primary_thread() + else: + trace("waiting for execution to finish") + self._execpool.wait_for_shutdown() + finally: + trace("execution finished, closing io write stream") + self._io.close_write() + trace("joining receiver thread") + self.join() + except KeyboardInterrupt: + # in the slave we can't really do anything sensible + trace("swallowing keyboardinterrupt, serve finished") + + def executetask(self, item): + try: + channel, (source, call_name, kwargs) = item + if not ISPY3 and kwargs: + # some python2 versions do not accept unicode keyword params + # note: Unserializer generally turns py2-str to py3-str objects + newkwargs = {} + for name, value in kwargs.items(): + if isinstance(name, unicode): + name = name.encode('ascii') + newkwargs[name] = value + kwargs = newkwargs + loc = {'channel' : channel, '__name__': '__channelexec__'} + self._trace("execution starts[%s]: %s" % + (channel.id, repr(source)[:50])) + channel._executing = True + try: + co = compile(source+'\n', '<remote exec>', 'exec') + do_exec(co, loc) # noqa + if call_name: + self._trace('calling %s(**%60r)' % (call_name, kwargs)) + function = loc[call_name] + function(channel, **kwargs) + finally: + channel._executing = False + self._trace("execution finished") + except KeyboardInterrupt: + channel.close(INTERRUPT_TEXT) + raise + except: + excinfo = self.exc_info() + if not isinstance(excinfo[1], EOFError): + if not channel.gateway._channelfactory.finished: + self._trace("got exception: %r" % (excinfo[1],)) + errortext = self._geterrortext(excinfo) + channel.close(errortext) + return + self._trace("ignoring EOFError because receiving finished") + channel.close() + +# +# Cross-Python pickling code, tested from test_serializer.py +# + +class DataFormatError(Exception): + pass + +class DumpError(DataFormatError): + """Error while serializing an object.""" + +class LoadError(DataFormatError): + """Error while unserializing an object.""" + +if ISPY3: + def bchr(n): + return bytes([n]) +else: + bchr = chr + +DUMPFORMAT_VERSION = bchr(1) + +FOUR_BYTE_INT_MAX = 2147483647 + +FLOAT_FORMAT = "!d" +FLOAT_FORMAT_SIZE = struct.calcsize(FLOAT_FORMAT) + +class _Stop(Exception): + pass + +class Unserializer(object): + num2func = {} # is filled after this class definition + py2str_as_py3str = True # True + py3str_as_py2str = False # false means py2 will get unicode + + def __init__(self, stream, channel_or_gateway=None, strconfig=None): + gateway = getattr(channel_or_gateway, 'gateway', channel_or_gateway) + strconfig = getattr(channel_or_gateway, '_strconfig', strconfig) + if strconfig: + self.py2str_as_py3str, self.py3str_as_py2str = strconfig + self.stream = stream + self.channelfactory = getattr(gateway, '_channelfactory', gateway) + + def load(self, versioned=False): + if versioned: + ver = self.stream.read(1) + if ver != DUMPFORMAT_VERSION: + raise LoadError("wrong dumpformat version") + self.stack = [] + try: + while True: + opcode = self.stream.read(1) + if not opcode: + raise EOFError + try: + loader = self.num2func[opcode] + except KeyError: + raise LoadError("unkown opcode %r - " + "wire protocol corruption?" % (opcode,)) + loader(self) + except _Stop: + if len(self.stack) != 1: + raise LoadError("internal unserialization error") + return self.stack.pop(0) + else: + raise LoadError("didn't get STOP") + + def load_none(self): + self.stack.append(None) + + def load_true(self): + self.stack.append(True) + + def load_false(self): + self.stack.append(False) + + def load_int(self): + i = self._read_int4() + self.stack.append(i) + + def load_longint(self): + s = self._read_byte_string() + self.stack.append(int(s)) + + if ISPY3: + load_long = load_int + load_longlong = load_longint + else: + def load_long(self): + i = self._read_int4() + self.stack.append(long(i)) + + def load_longlong(self): + l = self._read_byte_string() + self.stack.append(long(l)) + + def load_float(self): + binary = self.stream.read(FLOAT_FORMAT_SIZE) + self.stack.append(struct.unpack(FLOAT_FORMAT, binary)[0]) + + def _read_int4(self): + return struct.unpack("!i", self.stream.read(4))[0] + + def _read_byte_string(self): + length = self._read_int4() + as_bytes = self.stream.read(length) + return as_bytes + + def load_py3string(self): + as_bytes = self._read_byte_string() + if not ISPY3 and self.py3str_as_py2str: + # XXX Should we try to decode into latin-1? + self.stack.append(as_bytes) + else: + self.stack.append(as_bytes.decode("utf-8")) + + def load_py2string(self): + as_bytes = self._read_byte_string() + if ISPY3 and self.py2str_as_py3str: + s = as_bytes.decode("latin-1") + else: + s = as_bytes + self.stack.append(s) + + def load_bytes(self): + s = self._read_byte_string() + self.stack.append(s) + + def load_unicode(self): + self.stack.append(self._read_byte_string().decode("utf-8")) + + def load_newlist(self): + length = self._read_int4() + self.stack.append([None] * length) + + def load_setitem(self): + if len(self.stack) < 3: + raise LoadError("not enough items for setitem") + value = self.stack.pop() + key = self.stack.pop() + self.stack[-1][key] = value + + def load_newdict(self): + self.stack.append({}) + + def _load_collection(self, type_): + length = self._read_int4() + if length: + res = type_(self.stack[-length:]) + del self.stack[-length:] + self.stack.append(res) + else: + self.stack.append(type_()) + + def load_buildtuple(self): + self._load_collection(tuple) + + def load_set(self): + self._load_collection(set) + + def load_frozenset(self): + self._load_collection(frozenset) + + def load_stop(self): + raise _Stop + + def load_channel(self): + id = self._read_int4() + newchannel = self.channelfactory.new(id) + self.stack.append(newchannel) + +# automatically build opcodes and byte-encoding + +class opcode: + """ container for name -> num mappings. """ + +def _buildopcodes(): + l = [] + for name, func in Unserializer.__dict__.items(): + if name.startswith("load_"): + opname = name[5:].upper() + l.append((opname, func)) + l.sort() + for i,(opname, func) in enumerate(l): + assert i < 26, "xxx" + i = bchr(64+i) + Unserializer.num2func[i] = func + setattr(opcode, opname, i) + +_buildopcodes() + +def dumps(obj): + """ return a serialized bytestring of the given obj. + + The obj and all contained objects must be of a builtin + python type (so nested dicts, sets, etc. are all ok but + not user-level instances). + """ + return _Serializer().save(obj, versioned=True) + +def dump(byteio, obj): + """ write a serialized bytestring of the given obj to the given stream. """ + _Serializer(write=byteio.write).save(obj, versioned=True) + +def loads(bytestring, py2str_as_py3str=False, py3str_as_py2str=False): + """ return the object as deserialized from the given bytestring. + + py2str_as_py3str: if true then string (str) objects previously + dumped on Python2 will be loaded as Python3 + strings which really are text objects. + py3str_as_py2str: if true then string (str) objects previously + dumped on Python3 will be loaded as Python2 + strings instead of unicode objects. + + if the bytestring was dumped with an incompatible protocol + version or if the bytestring is corrupted, the + ``execnet.DataFormatError`` will be raised. + """ + io = BytesIO(bytestring) + return load(io, py2str_as_py3str=py2str_as_py3str, + py3str_as_py2str=py3str_as_py2str) + +def load(io, py2str_as_py3str=False, py3str_as_py2str=False): + """ derserialize an object form the specified stream. + + Behaviour and parameters are otherwise the same as with ``loads`` + """ + strconfig=(py2str_as_py3str, py3str_as_py2str) + return Unserializer(io, strconfig=strconfig).load(versioned=True) + +def loads_internal(bytestring, channelfactory=None, strconfig=None): + io = BytesIO(bytestring) + return Unserializer(io, channelfactory, strconfig).load() + +def dumps_internal(obj): + return _Serializer().save(obj) + + +class _Serializer(object): + _dispatch = {} + + def __init__(self, write=None): + if write is None: + self._streamlist = [] + write = self._streamlist.append + self._write = write + + def save(self, obj, versioned=False): + # calling here is not re-entrant but multiple instances + # may write to the same stream because of the common platform + # atomic-write guaruantee (concurrent writes each happen atomicly) + if versioned: + self._write(DUMPFORMAT_VERSION) + self._save(obj) + self._write(opcode.STOP) + try: + streamlist = self._streamlist + except AttributeError: + return None + return type(streamlist[0])().join(streamlist) + + def _save(self, obj): + tp = type(obj) + try: + dispatch = self._dispatch[tp] + except KeyError: + methodname = 'save_' + tp.__name__ + meth = getattr(self.__class__, methodname, None) + if meth is None: + raise DumpError("can't serialize %s" % (tp,)) + dispatch = self._dispatch[tp] = meth + dispatch(self, obj) + + def save_NoneType(self, non): + self._write(opcode.NONE) + + def save_bool(self, boolean): + if boolean: + self._write(opcode.TRUE) + else: + self._write(opcode.FALSE) + + def save_bytes(self, bytes_): + self._write(opcode.BYTES) + self._write_byte_sequence(bytes_) + + if ISPY3: + def save_str(self, s): + self._write(opcode.PY3STRING) + self._write_unicode_string(s) + else: + def save_str(self, s): + self._write(opcode.PY2STRING) + self._write_byte_sequence(s) + + def save_unicode(self, s): + self._write(opcode.UNICODE) + self._write_unicode_string(s) + + def _write_unicode_string(self, s): + try: + as_bytes = s.encode("utf-8") + except UnicodeEncodeError: + raise DumpError("strings must be utf-8 encodable") + self._write_byte_sequence(as_bytes) + + def _write_byte_sequence(self, bytes_): + self._write_int4(len(bytes_), "string is too long") + self._write(bytes_) + + def _save_integral(self, i, short_op, long_op): + if i <= FOUR_BYTE_INT_MAX: + self._write(short_op) + self._write_int4(i) + else: + self._write(long_op) + self._write_byte_sequence(str(i).rstrip("L").encode("ascii")) + + def save_int(self, i): + self._save_integral(i, opcode.INT, opcode.LONGINT) + + def save_long(self, l): + self._save_integral(l, opcode.LONG, opcode.LONGLONG) + + def save_float(self, flt): + self._write(opcode.FLOAT) + self._write(struct.pack(FLOAT_FORMAT, flt)) + + def _write_int4(self, i, error="int must be less than %i" % + (FOUR_BYTE_INT_MAX,)): + if i > FOUR_BYTE_INT_MAX: + raise DumpError(error) + self._write(struct.pack("!i", i)) + + def save_list(self, L): + self._write(opcode.NEWLIST) + self._write_int4(len(L), "list is too long") + for i, item in enumerate(L): + self._write_setitem(i, item) + + def _write_setitem(self, key, value): + self._save(key) + self._save(value) + self._write(opcode.SETITEM) + + def save_dict(self, d): + self._write(opcode.NEWDICT) + for key, value in d.items(): + self._write_setitem(key, value) + + def save_tuple(self, tup): + for item in tup: + self._save(item) + self._write(opcode.BUILDTUPLE) + self._write_int4(len(tup), "tuple is too long") + + def _write_set(self, s, op): + for item in s: + self._save(item) + self._write(op) + self._write_int4(len(s), "set is too long") + + def save_set(self, s): + self._write_set(s, opcode.SET) + + def save_frozenset(self, s): + self._write_set(s, opcode.FROZENSET) + + def save_Channel(self, channel): + self._write(opcode.CHANNEL) + self._write_int4(channel.id) + +def init_popen_io(execmodel): + if not hasattr(os, 'dup'): # jython + io = Popen2IO(sys.stdout, sys.stdin, execmodel) + import tempfile + sys.stdin = tempfile.TemporaryFile('r') + sys.stdout = tempfile.TemporaryFile('w') + else: + try: + devnull = os.devnull + except AttributeError: + if os.name == 'nt': + devnull = 'NUL' + else: + devnull = '/dev/null' + # stdin + stdin = execmodel.fdopen(os.dup(0), 'r', 1) + fd = os.open(devnull, os.O_RDONLY) + os.dup2(fd, 0) + os.close(fd) + + # stdout + stdout = execmodel.fdopen(os.dup(1), 'w', 1) + fd = os.open(devnull, os.O_WRONLY) + os.dup2(fd, 1) + + # stderr for win32 + if os.name == 'nt': + sys.stderr = execmodel.fdopen(os.dup(2), 'w', 1) + os.dup2(fd, 2) + os.close(fd) + io = Popen2IO(stdout, stdin, execmodel) + sys.stdin = execmodel.fdopen(0, 'r', 1) + sys.stdout = execmodel.fdopen(1, 'w', 1) + return io + +def serve(io, id): + trace("creating slavegateway on %r" %(io,)) + SlaveGateway(io=io, id=id, _startcount=2).serve() diff --git a/src/main/resources/PythonLibs/site-packages/execnet/gateway_bootstrap.py b/src/main/resources/PythonLibs/site-packages/execnet/gateway_bootstrap.py new file mode 100644 index 0000000000000000000000000000000000000000..abd084a45e4325fe391fdecb60e950609fea6ab7 --- /dev/null +++ b/src/main/resources/PythonLibs/site-packages/execnet/gateway_bootstrap.py @@ -0,0 +1,97 @@ +""" +code to initialize the remote side of a gateway once the io is created +""" +import os +import inspect +import execnet +from execnet import gateway_base +from execnet.gateway import Gateway +importdir = os.path.dirname(os.path.dirname(execnet.__file__)) + + +class HostNotFound(Exception): + pass + + +def bootstrap_popen(io, spec): + sendexec(io, + "import sys", + "sys.path.insert(0, %r)" % importdir, + "from execnet.gateway_base import serve, init_popen_io, get_execmodel", + "sys.stdout.write('1')", + "sys.stdout.flush()", + "execmodel = get_execmodel(%r)" % spec.execmodel, + "serve(init_popen_io(execmodel), id='%s-slave')" % spec.id, + ) + s = io.read(1) + assert s == "1".encode('ascii'), repr(s) + + +def bootstrap_ssh(io, spec): + try: + sendexec(io, + inspect.getsource(gateway_base), + "execmodel = get_execmodel(%r)" % spec.execmodel, + 'io = init_popen_io(execmodel)', + "io.write('1'.encode('ascii'))", + "serve(io, id='%s-slave')" % spec.id, + ) + s = io.read(1) + assert s == "1".encode('ascii') + except EOFError: + ret = io.wait() + if ret == 255: + raise HostNotFound(io.remoteaddress) + + +def bootstrap_socket(io, id): + #XXX: switch to spec + from execnet.gateway_socket import SocketIO + + sendexec(io, + inspect.getsource(gateway_base), + 'import socket', + inspect.getsource(SocketIO), + "try: execmodel", + "except NameError:", + " execmodel = get_execmodel('thread')", + "io = SocketIO(clientsock, execmodel)", + "io.write('1'.encode('ascii'))", + "serve(io, id='%s-slave')" % id, + ) + s = io.read(1) + assert s == "1".encode('ascii') + + +def sendexec(io, *sources): + source = "\n".join(sources) + io.write((repr(source)+ "\n").encode('ascii')) + + +def fix_pid_for_jython_popen(gw): + """ + fix for jython 2.5.1 + """ + spec, io = gw.spec, gw._io + if spec.popen and not spec.via: + #XXX: handle the case of remote being jython + # and not having the popen pid + if io.popen.pid is None: + io.popen.pid = gw.remote_exec( + "import os; channel.send(os.getpid())").receive() + + +def bootstrap(io, spec): + if spec.popen: + bootstrap_popen(io, spec) + elif spec.ssh: + bootstrap_ssh(io, spec) + elif spec.socket: + bootstrap_socket(io, spec) + else: + raise ValueError('unknown gateway type, cant bootstrap') + gw = Gateway(io, spec) + fix_pid_for_jython_popen(gw) + return gw + + diff --git a/src/main/resources/PythonLibs/site-packages/execnet/gateway_io.py b/src/main/resources/PythonLibs/site-packages/execnet/gateway_io.py new file mode 100644 index 0000000000000000000000000000000000000000..cce1fae1a6a7b4260ccc2fa4fd00b0e5f59bc0d3 --- /dev/null +++ b/src/main/resources/PythonLibs/site-packages/execnet/gateway_io.py @@ -0,0 +1,209 @@ +""" +execnet io initialization code + +creates io instances used for gateway io +""" +import os +import sys + +try: + from execnet.gateway_base import Popen2IO, Message +except ImportError: + from __main__ import Popen2IO, Message + +class Popen2IOMaster(Popen2IO): + def __init__(self, args, execmodel): + self.popen = p = execmodel.PopenPiped(args) + Popen2IO.__init__(self, p.stdin, p.stdout, execmodel=execmodel) + + def wait(self): + try: + return self.popen.wait() + except OSError: + pass # subprocess probably dead already + + def kill(self): + killpopen(self.popen) + +def killpopen(popen): + try: + if hasattr(popen, 'kill'): + popen.kill() + else: + killpid(popen.pid) + except EnvironmentError: + sys.stderr.write("ERROR killing: %s\n" %(sys.exc_info()[1])) + sys.stderr.flush() + +def killpid(pid): + if hasattr(os, 'kill'): + os.kill(pid, 15) + elif sys.platform == "win32" or getattr(os, '_name', None) == 'nt': + try: + import ctypes + except ImportError: + import subprocess + # T: treekill, F: Force + cmd = ("taskkill /T /F /PID %d" %(pid)).split() + ret = subprocess.call(cmd) + if ret != 0: + raise EnvironmentError("taskkill returned %r" %(ret,)) + else: + PROCESS_TERMINATE = 1 + handle = ctypes.windll.kernel32.OpenProcess( + PROCESS_TERMINATE, False, pid) + ctypes.windll.kernel32.TerminateProcess(handle, -1) + ctypes.windll.kernel32.CloseHandle(handle) + else: + raise EnvironmentError("no method to kill %s" %(pid,)) + + + +popen_bootstrapline = "import sys;exec(eval(sys.stdin.readline()))" + + +def popen_args(spec): + python = spec.python or sys.executable + args = [str(python), '-u'] + if spec is not None and spec.dont_write_bytecode: + args.append("-B") + # Slight gymnastics in ordering these arguments because CPython (as of + # 2.7.1) ignores -B if you provide `python -c "something" -B` + args.extend(['-c', popen_bootstrapline]) + return args + + +def ssh_args(spec): + remotepython = spec.python or "python" + args = ["ssh", "-C" ] + if spec.ssh_config is not None: + args.extend(['-F', str(spec.ssh_config)]) + + args.extend(spec.ssh.split()) + remotecmd = '%s -c "%s"' % (remotepython, popen_bootstrapline) + args.append(remotecmd) + return args + +def create_io(spec, execmodel): + if spec.popen: + args = popen_args(spec) + return Popen2IOMaster(args, execmodel) + if spec.ssh: + args = ssh_args(spec) + io = Popen2IOMaster(args, execmodel) + io.remoteaddress = spec.ssh + return io + +# +# Proxy Gateway handling code +# +# master: proxy initiator +# forwarder: forwards between master and sub +# sub: sub process that is proxied to the initiator + +RIO_KILL = 1 +RIO_WAIT = 2 +RIO_REMOTEADDRESS = 3 +RIO_CLOSE_WRITE = 4 + +class ProxyIO(object): + """ A Proxy IO object allows to instantiate a Gateway + through another "via" gateway. A master:ProxyIO object + provides an IO object effectively connected to the sub + via the forwarder. To achieve this, master:ProxyIO interacts + with forwarder:serve_proxy_io() which itself + instantiates and interacts with the sub. + """ + def __init__(self, proxy_channel, execmodel): + # after exchanging the control channel we use proxy_channel + # for messaging IO + self.controlchan = proxy_channel.gateway.newchannel() + proxy_channel.send(self.controlchan) + self.iochan = proxy_channel + self.iochan_file = self.iochan.makefile('r') + self.execmodel = execmodel + + def read(self, nbytes): + return self.iochan_file.read(nbytes) + + def write(self, data): + return self.iochan.send(data) + + def _controll(self, event): + self.controlchan.send(event) + return self.controlchan.receive() + + def close_write(self): + self._controll(RIO_CLOSE_WRITE) + + def kill(self): + self._controll(RIO_KILL) + + def wait(self): + return self._controll(RIO_WAIT) + + @property + def remoteaddress(self): + return self._controll(RIO_REMOTEADDRESS) + + def __repr__(self): + return '<RemoteIO via %s>' % (self.iochan.gateway.id, ) + +class PseudoSpec: + def __init__(self, vars): + self.__dict__.update(vars) + def __getattr__(self, name): + return None + +def serve_proxy_io(proxy_channelX): + execmodel = proxy_channelX.gateway.execmodel + _trace = proxy_channelX.gateway._trace + tag = "serve_proxy_io:%s " % proxy_channelX.id + def log(*msg): + _trace(tag + msg[0], *msg[1:]) + spec = PseudoSpec(proxy_channelX.receive()) + # create sub IO object which we will proxy back to our proxy initiator + sub_io = create_io(spec, execmodel) + control_chan = proxy_channelX.receive() + log("got control chan", control_chan) + + # read data from master, forward it to the sub + # XXX writing might block, thus blocking the receiver thread + def forward_to_sub(data): + log("forward data to sub, size %s" % len(data)) + sub_io.write(data) + proxy_channelX.setcallback(forward_to_sub) + + def controll(data): + if data==RIO_WAIT: + control_chan.send(sub_io.wait()) + elif data==RIO_KILL: + control_chan.send(sub_io.kill()) + elif data==RIO_REMOTEADDRESS: + control_chan.send(sub_io.remoteaddress) + elif data==RIO_CLOSE_WRITE: + control_chan.send(sub_io.close_write()) + control_chan.setcallback(controll) + + # write data to the master coming from the sub + forward_to_master_file = proxy_channelX.makefile("w") + + # read bootstrap byte from sub, send it on to master + log('reading bootstrap byte from sub', spec.id) + initial = sub_io.read(1) + assert initial == '1'.encode('ascii'), initial + log('forwarding bootstrap byte from sub', spec.id) + forward_to_master_file.write(initial) + + # enter message forwarding loop + while True: + try: + message = Message.from_io(sub_io) + except EOFError: + log('EOF from sub, terminating proxying loop', spec.id) + break + message.to_io(forward_to_master_file) + # proxy_channelX will be closed from remote_exec's finalization code + +if __name__ == "__channelexec__": + serve_proxy_io(channel) # noqa diff --git a/src/main/resources/PythonLibs/site-packages/execnet/gateway_socket.py b/src/main/resources/PythonLibs/site-packages/execnet/gateway_socket.py new file mode 100644 index 0000000000000000000000000000000000000000..3bb0589cb5af7553e90000fe25371c64dc226068 --- /dev/null +++ b/src/main/resources/PythonLibs/site-packages/execnet/gateway_socket.py @@ -0,0 +1,89 @@ +from execnet.gateway_bootstrap import HostNotFound +import sys + +try: bytes +except NameError: bytes = str + +class SocketIO: + def __init__(self, sock, execmodel): + self.sock = sock + self.execmodel = execmodel + socket = execmodel.socket + try: + sock.setsockopt(socket.SOL_IP, socket.IP_TOS, 0x10)# IPTOS_LOWDELAY + sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) + except (AttributeError, socket.error): + sys.stderr.write("WARNING: cannot set socketoption") + + def read(self, numbytes): + "Read exactly 'bytes' bytes from the socket." + buf = bytes() + while len(buf) < numbytes: + t = self.sock.recv(numbytes - len(buf)) + if not t: + raise EOFError + buf += t + return buf + + def write(self, data): + self.sock.sendall(data) + + def close_read(self): + try: + self.sock.shutdown(0) + except self.execmodel.socket.error: + pass + def close_write(self): + try: + self.sock.shutdown(1) + except self.execmodel.socket.error: + pass + + def wait(self): + pass + + def kill(self): + pass + + +def start_via(gateway, hostport=None): + """ return a host, port tuple, + after instanciating a socketserver on the given gateway + """ + if hostport is None: + host, port = ('localhost', 0) + else: + host, port = hostport + + from execnet.script import socketserver + + # execute the above socketserverbootstrap on the other side + channel = gateway.remote_exec(socketserver) + channel.send((host, port)) + (realhost, realport) = channel.receive() + #self._trace("new_remote received" + # "port=%r, hostname = %r" %(realport, hostname)) + if not realhost or realhost=="0.0.0.0": + realhost = "localhost" + return realhost, realport + + +def create_io(spec, group, execmodel): + assert not spec.python, ( + "socket: specifying python executables not yet supported") + gateway_id = spec.installvia + if gateway_id: + host, port = start_via(group[gateway_id]) + else: + host, port = spec.socket.split(":") + port = int(port) + + socket = execmodel.socket + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + io = SocketIO(sock, execmodel) + io.remoteaddress = '%s:%d' % (host, port) + try: + sock.connect((host, port)) + except execmodel.socket.gaierror: + raise HostNotFound(str(sys.exc_info()[1])) + return io diff --git a/src/main/resources/PythonLibs/site-packages/execnet/multi.py b/src/main/resources/PythonLibs/site-packages/execnet/multi.py new file mode 100644 index 0000000000000000000000000000000000000000..ebe451d541396418439a3da2c10debbff6154443 --- /dev/null +++ b/src/main/resources/PythonLibs/site-packages/execnet/multi.py @@ -0,0 +1,300 @@ +""" +Managing Gateway Groups and interactions with multiple channels. + +(c) 2008-2014, Holger Krekel and others +""" + +import sys, atexit + +from execnet import XSpec +from execnet import gateway_io, gateway_bootstrap +from execnet.gateway_base import reraise, trace, get_execmodel + +from threading import Lock + +NO_ENDMARKER_WANTED = object() + +class Group(object): + """ Gateway Groups. """ + defaultspec = "popen" + def __init__(self, xspecs=(), execmodel="thread"): + """ initialize group and make gateways as specified. + execmodel can be 'thread' or 'eventlet'. + """ + self._gateways = [] + self._autoidcounter = 0 + self._gateways_to_join = [] + self.lock = Lock() + # we use the same execmodel for all of the Gateway objects + # we spawn on our side. Probably we should not allow different + # execmodels between different groups but not clear. + # Note that "other side" execmodels may differ and is typically + # specified by the spec passed to makegateway. + self.set_execmodel(execmodel) + for xspec in xspecs: + self.makegateway(xspec) + atexit.register(self._cleanup_atexit) + + @property + def execmodel(self): + return self._execmodel + + @property + def remote_execmodel(self): + return self._remote_execmodel + + def set_execmodel(self, execmodel, remote_execmodel=None): + """ Set the execution model for local and remote site. + + execmodel can be one of "thread" or "eventlet" (XXX gevent). + It determines the execution model for any newly created gateway. + If remote_execmodel is not specified it takes on the value + of execmodel. + + NOTE: Execution models can only be set before any gateway is created. + + """ + if self._gateways: + raise ValueError("can not set execution models if " + "gateways have been created already") + if remote_execmodel is None: + remote_execmodel = execmodel + self._execmodel = get_execmodel(execmodel) + self._remote_execmodel = get_execmodel(remote_execmodel) + + def __repr__(self): + idgateways = [gw.id for gw in self] + return "<Group %r>" %(idgateways) + + def __getitem__(self, key): + if isinstance(key, int): + return self._gateways[key] + for gw in self._gateways: + if gw == key or gw.id == key: + return gw + raise KeyError(key) + + def __contains__(self, key): + try: + self[key] + return True + except KeyError: + return False + + def __len__(self): + return len(self._gateways) + + def __iter__(self): + return iter(list(self._gateways)) + + def makegateway(self, spec=None): + """create and configure a gateway to a Python interpreter. + The ``spec`` string encodes the target gateway type + and configuration information. The general format is:: + + key1=value1//key2=value2//... + + If you leave out the ``=value`` part a True value is assumed. + Valid types: ``popen``, ``ssh=hostname``, ``socket=host:port``. + Valid configuration:: + + id=<string> specifies the gateway id + python=<path> specifies which python interpreter to execute + execmodel=model 'thread', 'eventlet', 'gevent' model for execution + chdir=<path> specifies to which directory to change + nice=<path> specifies process priority of new process + env:NAME=value specifies a remote environment variable setting. + + If no spec is given, self.defaultspec is used. + """ + if not spec: + spec = self.defaultspec + if not isinstance(spec, XSpec): + spec = XSpec(spec) + self.allocate_id(spec) + if spec.execmodel is None: + spec.execmodel = self.remote_execmodel.backend + if spec.via: + assert not spec.socket + master = self[spec.via] + proxy_channel = master.remote_exec(gateway_io) + proxy_channel.send(vars(spec)) + proxy_io_master = gateway_io.ProxyIO(proxy_channel, self.execmodel) + gw = gateway_bootstrap.bootstrap(proxy_io_master, spec) + elif spec.popen or spec.ssh: + io = gateway_io.create_io(spec, execmodel=self.execmodel) + gw = gateway_bootstrap.bootstrap(io, spec) + elif spec.socket: + from execnet import gateway_socket + io = gateway_socket.create_io(spec, self, execmodel=self.execmodel) + gw = gateway_bootstrap.bootstrap(io, spec) + else: + raise ValueError("no gateway type found for %r" % (spec._spec,)) + gw.spec = spec + self._register(gw) + if spec.chdir or spec.nice or spec.env: + channel = gw.remote_exec(""" + import os + path, nice, env = channel.receive() + if path: + if not os.path.exists(path): + os.mkdir(path) + os.chdir(path) + if nice and hasattr(os, 'nice'): + os.nice(nice) + if env: + for name, value in env.items(): + os.environ[name] = value + """) + nice = spec.nice and int(spec.nice) or 0 + channel.send((spec.chdir, nice, spec.env)) + channel.waitclose() + return gw + + def allocate_id(self, spec): + """ allocate id for the given xspec object. """ + if spec.id is None: + id = "gw" + str(self._autoidcounter) + with self.lock: + self._autoidcounter += 1 + if id in self: + raise ValueError("already have gateway with id %r" %(id,)) + spec.id = id + + def _register(self, gateway): + assert not hasattr(gateway, '_group') + assert gateway.id + assert id not in self + self._gateways.append(gateway) + gateway._group = self + + def _unregister(self, gateway): + self._gateways.remove(gateway) + self._gateways_to_join.append(gateway) + + def _cleanup_atexit(self): + trace("=== atexit cleanup %r ===" %(self,)) + self.terminate(timeout=1.0) + + def terminate(self, timeout=None): + """ trigger exit of member gateways and wait for termination + of member gateways and associated subprocesses. After waiting + timeout seconds try to to kill local sub processes of popen- + and ssh-gateways. Timeout defaults to None meaning + open-ended waiting and no kill attempts. + """ + + while self: + vias = {} + for gw in self: + if gw.spec.via: + vias[gw.spec.via] = True + for gw in self: + if gw.id not in vias: + gw.exit() + + def join_wait(gw): + gw.join() + gw._io.wait() + def kill(gw): + trace("Gateways did not come down after timeout: %r" % gw) + gw._io.kill() + + safe_terminate(self.execmodel, timeout, [ + (lambda: join_wait(gw), lambda: kill(gw)) + for gw in self._gateways_to_join]) + self._gateways_to_join[:] = [] + + def remote_exec(self, source, **kwargs): + """ remote_exec source on all member gateways and return + MultiChannel connecting to all sub processes. + """ + channels = [] + for gw in self: + channels.append(gw.remote_exec(source, **kwargs)) + return MultiChannel(channels) + +class MultiChannel: + def __init__(self, channels): + self._channels = channels + + def __len__(self): + return len(self._channels) + + def __iter__(self): + return iter(self._channels) + + def __getitem__(self, key): + return self._channels[key] + + def __contains__(self, chan): + return chan in self._channels + + def send_each(self, item): + for ch in self._channels: + ch.send(item) + + def receive_each(self, withchannel=False): + assert not hasattr(self, '_queue') + l = [] + for ch in self._channels: + obj = ch.receive() + if withchannel: + l.append((ch, obj)) + else: + l.append(obj) + return l + + def make_receive_queue(self, endmarker=NO_ENDMARKER_WANTED): + try: + return self._queue + except AttributeError: + self._queue = None + for ch in self._channels: + if self._queue is None: + self._queue = ch.gateway.execmodel.queue.Queue() + def putreceived(obj, channel=ch): + self._queue.put((channel, obj)) + if endmarker is NO_ENDMARKER_WANTED: + ch.setcallback(putreceived) + else: + ch.setcallback(putreceived, endmarker=endmarker) + return self._queue + + + def waitclose(self): + first = None + for ch in self._channels: + try: + ch.waitclose() + except ch.RemoteError: + if first is None: + first = sys.exc_info() + if first: + reraise(*first) + + + +def safe_terminate(execmodel, timeout, list_of_paired_functions): + workerpool = execmodel.WorkerPool() + + def termkill(termfunc, killfunc): + termreply = workerpool.spawn(termfunc) + try: + termreply.get(timeout=timeout) + except IOError: + killfunc() + + replylist = [] + for termfunc, killfunc in list_of_paired_functions: + reply = workerpool.spawn(termkill, termfunc, killfunc) + replylist.append(reply) + for reply in replylist: + reply.get() + workerpool.waitall() + + +default_group = Group() +makegateway = default_group.makegateway +set_execmodel = default_group.set_execmodel + diff --git a/src/main/resources/PythonLibs/site-packages/execnet/rsync.py b/src/main/resources/PythonLibs/site-packages/execnet/rsync.py new file mode 100644 index 0000000000000000000000000000000000000000..ccfad91e075eb990919a311db759daedf89ad173 --- /dev/null +++ b/src/main/resources/PythonLibs/site-packages/execnet/rsync.py @@ -0,0 +1,207 @@ +""" +1:N rsync implemenation on top of execnet. + +(c) 2006-2009, Armin Rigo, Holger Krekel, Maciej Fijalkowski +""" +import os, stat + +try: + from hashlib import md5 +except ImportError: + from md5 import md5 + +try: + from queue import Queue +except ImportError: + from Queue import Queue + +import execnet.rsync_remote + +class RSync(object): + """ This class allows to send a directory structure (recursively) + to one or multiple remote filesystems. + + There is limited support for symlinks, which means that symlinks + pointing to the sourcetree will be send "as is" while external + symlinks will be just copied (regardless of existance of such + a path on remote side). + """ + def __init__(self, sourcedir, callback=None, verbose=True): + self._sourcedir = str(sourcedir) + self._verbose = verbose + assert callback is None or hasattr(callback, '__call__') + self._callback = callback + self._channels = {} + self._receivequeue = Queue() + self._links = [] + + def filter(self, path): + return True + + def _end_of_channel(self, channel): + if channel in self._channels: + # too early! we must have got an error + channel.waitclose() + # or else we raise one + raise IOError('connection unexpectedly closed: %s ' % ( + channel.gateway,)) + + def _process_link(self, channel): + for link in self._links: + channel.send(link) + # completion marker, this host is done + channel.send(42) + + def _done(self, channel): + """ Call all callbacks + """ + finishedcallback = self._channels.pop(channel) + if finishedcallback: + finishedcallback() + channel.waitclose() + + def _list_done(self, channel): + # sum up all to send + if self._callback: + s = sum([self._paths[i] for i in self._to_send[channel]]) + self._callback("list", s, channel) + + def _send_item(self, channel, data): + """ Send one item + """ + modified_rel_path, checksum = data + modifiedpath = os.path.join(self._sourcedir, *modified_rel_path) + try: + f = open(modifiedpath, 'rb') + data = f.read() + except IOError: + data = None + + # provide info to progress callback function + modified_rel_path = "/".join(modified_rel_path) + if data is not None: + self._paths[modified_rel_path] = len(data) + else: + self._paths[modified_rel_path] = 0 + if channel not in self._to_send: + self._to_send[channel] = [] + self._to_send[channel].append(modified_rel_path) + #print "sending", modified_rel_path, data and len(data) or 0, checksum + + if data is not None: + f.close() + if checksum is not None and checksum == md5(data).digest(): + data = None # not really modified + else: + self._report_send_file(channel.gateway, modified_rel_path) + channel.send(data) + + def _report_send_file(self, gateway, modified_rel_path): + if self._verbose: + print("%s <= %s" %(gateway, modified_rel_path)) + + def send(self, raises=True): + """ Sends a sourcedir to all added targets. Flag indicates + whether to raise an error or return in case of lack of + targets + """ + if not self._channels: + if raises: + raise IOError("no targets available, maybe you " + "are trying call send() twice?") + return + # normalize a trailing '/' away + self._sourcedir = os.path.dirname(os.path.join(self._sourcedir, 'x')) + # send directory structure and file timestamps/sizes + self._send_directory_structure(self._sourcedir) + + # paths and to_send are only used for doing + # progress-related callbacks + self._paths = {} + self._to_send = {} + + # send modified file to clients + while self._channels: + channel, req = self._receivequeue.get() + if req is None: + self._end_of_channel(channel) + else: + command, data = req + if command == "links": + self._process_link(channel) + elif command == "done": + self._done(channel) + elif command == "ack": + if self._callback: + self._callback("ack", self._paths[data], channel) + elif command == "list_done": + self._list_done(channel) + elif command == "send": + self._send_item(channel, data) + del data + else: + assert "Unknown command %s" % command + + def add_target(self, gateway, destdir, + finishedcallback=None, **options): + """ Adds a remote target specified via a gateway + and a remote destination directory. + """ + for name in options: + assert name in ('delete',) + def itemcallback(req): + self._receivequeue.put((channel, req)) + channel = gateway.remote_exec(execnet.rsync_remote) + channel.reconfigure(py2str_as_py3str=False, py3str_as_py2str=False) + channel.setcallback(itemcallback, endmarker = None) + channel.send((str(destdir), options)) + self._channels[channel] = finishedcallback + + def _broadcast(self, msg): + for channel in self._channels: + channel.send(msg) + + def _send_link(self, linktype, basename, linkpoint): + self._links.append((linktype, basename, linkpoint)) + + def _send_directory(self, path): + # dir: send a list of entries + names = [] + subpaths = [] + for name in os.listdir(path): + p = os.path.join(path, name) + if self.filter(p): + names.append(name) + subpaths.append(p) + mode = os.lstat(path).st_mode + self._broadcast([mode] + names) + for p in subpaths: + self._send_directory_structure(p) + + def _send_link_structure(self, path): + linkpoint = os.readlink(path) + basename = path[len(self._sourcedir) + 1:] + if linkpoint.startswith(self._sourcedir): + self._send_link("linkbase", basename, + linkpoint[len(self._sourcedir) + 1:]) + else: + # relative or absolute link, just send it + self._send_link("link", basename, linkpoint) + self._broadcast(None) + + def _send_directory_structure(self, path): + try: + st = os.lstat(path) + except OSError: + self._broadcast((None, 0, 0)) + return + if stat.S_ISREG(st.st_mode): + # regular file: send a mode/timestamp/size pair + self._broadcast((st.st_mode, st.st_mtime, st.st_size)) + elif stat.S_ISDIR(st.st_mode): + self._send_directory(path) + elif stat.S_ISLNK(st.st_mode): + self._send_link_structure(path) + else: + raise ValueError("cannot sync %r" % (path,)) + diff --git a/src/main/resources/PythonLibs/site-packages/execnet/rsync_remote.py b/src/main/resources/PythonLibs/site-packages/execnet/rsync_remote.py new file mode 100644 index 0000000000000000000000000000000000000000..a36b8ce4f12ad53f1004b8318745dafc8b2a2580 --- /dev/null +++ b/src/main/resources/PythonLibs/site-packages/execnet/rsync_remote.py @@ -0,0 +1,109 @@ +""" +(c) 2006-2013, Armin Rigo, Holger Krekel, Maciej Fijalkowski +""" +def serve_rsync(channel): + import os, stat, shutil + try: + from hashlib import md5 + except ImportError: + from md5 import md5 + destdir, options = channel.receive() + modifiedfiles = [] + + def remove(path): + assert path.startswith(destdir) + try: + os.unlink(path) + except OSError: + # assume it's a dir + shutil.rmtree(path) + + def receive_directory_structure(path, relcomponents): + try: + st = os.lstat(path) + except OSError: + st = None + msg = channel.receive() + if isinstance(msg, list): + if st and not stat.S_ISDIR(st.st_mode): + os.unlink(path) + st = None + if not st: + os.makedirs(path) + mode = msg.pop(0) + if mode: + os.chmod(path, mode) + entrynames = {} + for entryname in msg: + destpath = os.path.join(path, entryname) + receive_directory_structure(destpath, relcomponents + [entryname]) + entrynames[entryname] = True + if options.get('delete'): + for othername in os.listdir(path): + if othername not in entrynames: + otherpath = os.path.join(path, othername) + remove(otherpath) + elif msg is not None: + assert isinstance(msg, tuple) + checksum = None + if st: + if stat.S_ISREG(st.st_mode): + msg_mode, msg_mtime, msg_size = msg + if msg_size != st.st_size: + pass + elif msg_mtime != st.st_mtime: + f = open(path, 'rb') + checksum = md5(f.read()).digest() + f.close() + elif msg_mode and msg_mode != st.st_mode: + os.chmod(path, msg_mode) + return + else: + return # already fine + else: + remove(path) + channel.send(("send", (relcomponents, checksum))) + modifiedfiles.append((path, msg)) + receive_directory_structure(destdir, []) + + STRICT_CHECK = False # seems most useful this way for py.test + channel.send(("list_done", None)) + + for path, (mode, time, size) in modifiedfiles: + data = channel.receive() + channel.send(("ack", path[len(destdir) + 1:])) + if data is not None: + if STRICT_CHECK and len(data) != size: + raise IOError('file modified during rsync: %r' % (path,)) + f = open(path, 'wb') + f.write(data) + f.close() + try: + if mode: + os.chmod(path, mode) + os.utime(path, (time, time)) + except OSError: + pass + del data + channel.send(("links", None)) + + msg = channel.receive() + while msg != 42: + # we get symlink + _type, relpath, linkpoint = msg + path = os.path.join(destdir, relpath) + try: + remove(path) + except OSError: + pass + if _type == "linkbase": + src = os.path.join(destdir, linkpoint) + else: + assert _type == "link", _type + src = linkpoint + os.symlink(src, path) + msg = channel.receive() + channel.send(("done", None)) + +if __name__ == '__channelexec__': + serve_rsync(channel) # noqa diff --git a/src/main/resources/PythonLibs/site-packages/execnet/script/__init__.py b/src/main/resources/PythonLibs/site-packages/execnet/script/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..792d6005489ebee62cde02066f19c5521e620451 --- /dev/null +++ b/src/main/resources/PythonLibs/site-packages/execnet/script/__init__.py @@ -0,0 +1 @@ +# diff --git a/src/main/resources/PythonLibs/site-packages/execnet/script/loop_socketserver.py b/src/main/resources/PythonLibs/site-packages/execnet/script/loop_socketserver.py new file mode 100644 index 0000000000000000000000000000000000000000..44896b67b2530d13e25eed5b934ef656e0b375c9 --- /dev/null +++ b/src/main/resources/PythonLibs/site-packages/execnet/script/loop_socketserver.py @@ -0,0 +1,14 @@ + +import os, sys +import subprocess + +if __name__ == '__main__': + directory = os.path.dirname(os.path.abspath(sys.argv[0])) + script = os.path.join(directory, 'socketserver.py') + while 1: + cmdlist = ["python", script] + cmdlist.extend(sys.argv[1:]) + text = "starting subcommand: " + " ".join(cmdlist) + print(text) + process = subprocess.Popen(cmdlist) + process.wait() diff --git a/src/main/resources/PythonLibs/site-packages/execnet/script/quitserver.py b/src/main/resources/PythonLibs/site-packages/execnet/script/quitserver.py new file mode 100644 index 0000000000000000000000000000000000000000..5b7ebdb9d58aefc3b47ec9c00180b42be5e4e16d --- /dev/null +++ b/src/main/resources/PythonLibs/site-packages/execnet/script/quitserver.py @@ -0,0 +1,16 @@ +""" + + send a "quit" signal to a remote server + +""" + +import sys +import socket + +hostport = sys.argv[1] +host, port = hostport.split(':') +hostport = (host, int(port)) + +sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +sock.connect(hostport) +sock.sendall('"raise KeyboardInterrupt"\n') diff --git a/src/main/resources/PythonLibs/site-packages/execnet/script/shell.py b/src/main/resources/PythonLibs/site-packages/execnet/script/shell.py new file mode 100644 index 0000000000000000000000000000000000000000..ecea1674f9a9e6ebf89295df805063e67d106948 --- /dev/null +++ b/src/main/resources/PythonLibs/site-packages/execnet/script/shell.py @@ -0,0 +1,84 @@ +#! /usr/bin/env python +""" +a remote python shell + +for injection into startserver.py +""" +import sys, os, socket, select + +try: + clientsock +except NameError: + print("client side starting") + host, port = sys.argv[1].split(':') + port = int(port) + myself = open(os.path.abspath(sys.argv[0]), 'rU').read() + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((host, port)) + sock.sendall(repr(myself)+'\n') + print("send boot string") + inputlist = [ sock, sys.stdin ] + try: + while 1: + r,w,e = select.select(inputlist, [], []) + if sys.stdin in r: + line = raw_input() + sock.sendall(line + '\n') + if sock in r: + line = sock.recv(4096) + sys.stdout.write(line) + sys.stdout.flush() + except: + import traceback + print(traceback.print_exc()) + + sys.exit(1) + +print("server side starting") +# server side +# +from traceback import print_exc +from threading import Thread + +class promptagent(Thread): + def __init__(self, clientsock): + Thread.__init__(self) + self.clientsock = clientsock + + def run(self): + print("Entering thread prompt loop") + clientfile = self.clientsock.makefile('w') + + filein = self.clientsock.makefile('r') + loc = self.clientsock.getsockname() + + while 1: + try: + clientfile.write('%s %s >>> ' % loc) + clientfile.flush() + line = filein.readline() + if len(line)==0: raise EOFError("nothing") + #print >>sys.stderr,"got line: " + line + if line.strip(): + oldout, olderr = sys.stdout, sys.stderr + sys.stdout, sys.stderr = clientfile, clientfile + try: + try: + exec(compile(line + '\n','<remote pyin>', 'single')) + except: + print_exc() + finally: + sys.stdout=oldout + sys.stderr=olderr + clientfile.flush() + except EOFError: + #e = sys.exc_info()[1] + sys.stderr.write("connection close, prompt thread returns") + break + #print >>sys.stdout, "".join(apply(format_exception,sys.exc_info())) + + self.clientsock.close() + +prompter = promptagent(clientsock) # noqa +prompter.start() +print("promptagent - thread started") diff --git a/src/main/resources/PythonLibs/site-packages/execnet/script/socketserver.py b/src/main/resources/PythonLibs/site-packages/execnet/script/socketserver.py new file mode 100644 index 0000000000000000000000000000000000000000..7b0b92ab4d2876953aed0c15229fc533a656067f --- /dev/null +++ b/src/main/resources/PythonLibs/site-packages/execnet/script/socketserver.py @@ -0,0 +1,122 @@ +#! /usr/bin/env python + +""" + start socket based minimal readline exec server + + it can exeuted in 2 modes of operation + + 1. as normal script, that listens for new connections + + 2. via existing_gateway.remote_exec (as imported module) + +""" +# this part of the program only executes on the server side +# + +progname = 'socket_readline_exec_server-1.2' + +import sys, os + +def get_fcntl(): + try: + import fcntl + except ImportError: + fcntl = None + return fcntl + +fcntl = get_fcntl() + +debug = 0 + +if debug: # and not os.isatty(sys.stdin.fileno()): + f = open('/tmp/execnet-socket-pyout.log', 'w') + old = sys.stdout, sys.stderr + sys.stdout = sys.stderr = f + +def print_(*args): + print(" ".join(str(arg) for arg in args)) + +if sys.version_info > (3, 0): + exec("""def exec_(source, locs): + exec(source, locs)""") +else: + exec("""def exec_(source, locs): + exec source in locs""") + +def exec_from_one_connection(serversock): + print_(progname, 'Entering Accept loop', serversock.getsockname()) + clientsock,address = serversock.accept() + print_(progname, 'got new connection from %s %s' % address) + clientfile = clientsock.makefile('rb') + print_("reading line") + # rstrip so that we can use \r\n for telnet testing + source = clientfile.readline().rstrip() + clientfile.close() + g = {'clientsock' : clientsock, 'address' : address, 'execmodel': execmodel} + source = eval(source) + if source: + co = compile(source+'\n', source, 'exec') + print_(progname, 'compiled source, executing') + try: + exec_(co, g) # noqa + finally: + print_(progname, 'finished executing code') + # background thread might hold a reference to this (!?) + #clientsock.close() + +def bind_and_listen(hostport, execmodel): + socket = execmodel.socket + if isinstance(hostport, str): + host, port = hostport.split(':') + hostport = (host, int(port)) + serversock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + # set close-on-exec + if hasattr(fcntl, 'FD_CLOEXEC'): + old = fcntl.fcntl(serversock.fileno(), fcntl.F_GETFD) + fcntl.fcntl(serversock.fileno(), fcntl.F_SETFD, old | fcntl.FD_CLOEXEC) + # allow the address to be re-used in a reasonable amount of time + if os.name == 'posix' and sys.platform != 'cygwin': + serversock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + serversock.bind(hostport) + serversock.listen(5) + return serversock + +def startserver(serversock, loop=False): + try: + while 1: + try: + exec_from_one_connection(serversock) + except (KeyboardInterrupt, SystemExit): + raise + except: + if debug: + import traceback + traceback.print_exc() + else: + excinfo = sys.exc_info() + print_("got exception", excinfo[1]) + if not loop: + break + finally: + print_("leaving socketserver execloop") + serversock.shutdown(2) + +if __name__ == '__main__': + import sys + if len(sys.argv)>1: + hostport = sys.argv[1] + else: + hostport = ':8888' + from execnet.gateway_base import get_execmodel + execmodel = get_execmodel("thread") + serversock = bind_and_listen(hostport, execmodel) + startserver(serversock, loop=False) + +elif __name__=='__channelexec__': + execmodel = channel.gateway.execmodel # noqa + bindname = channel.receive() # noqa + sock = bind_and_listen(bindname, execmodel) + port = sock.getsockname() + channel.send(port) # noqa + startserver(sock) diff --git a/src/main/resources/PythonLibs/site-packages/execnet/script/socketserverservice.py b/src/main/resources/PythonLibs/site-packages/execnet/script/socketserverservice.py new file mode 100644 index 0000000000000000000000000000000000000000..562083c27e246e6b35c515fd7de005bafa37f392 --- /dev/null +++ b/src/main/resources/PythonLibs/site-packages/execnet/script/socketserverservice.py @@ -0,0 +1,89 @@ +""" +A windows service wrapper for the py.execnet socketserver. + +To use, run: + python socketserverservice.py register + net start ExecNetSocketServer +""" + +import sys +import win32serviceutil +import win32service +import win32event +import win32evtlogutil +import servicemanager +import threading +import socketserver + + +appname = 'ExecNetSocketServer' + + +class SocketServerService(win32serviceutil.ServiceFramework): + _svc_name_ = appname + _svc_display_name_ = "%s" % appname + _svc_deps_ = ["EventLog"] + def __init__(self, args): + # The exe-file has messages for the Event Log Viewer. + # Register the exe-file as event source. + # + # Probably it would be better if this is done at installation time, + # so that it also could be removed if the service is uninstalled. + # Unfortunately it cannot be done in the 'if __name__ == "__main__"' + # block below, because the 'frozen' exe-file does not run this code. + # + win32evtlogutil.AddSourceToRegistry(self._svc_display_name_, + servicemanager.__file__, + "Application") + win32serviceutil.ServiceFramework.__init__(self, args) + self.hWaitStop = win32event.CreateEvent(None, 0, 0, None) + self.WAIT_TIME = 1000 # in milliseconds + + + def SvcStop(self): + self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING) + win32event.SetEvent(self.hWaitStop) + + + def SvcDoRun(self): + # Redirect stdout and stderr to prevent "IOError: [Errno 9] + # Bad file descriptor". Windows services don't have functional + # output streams. + sys.stdout = sys.stderr = open('nul', 'w') + + # Write a 'started' event to the event log... + win32evtlogutil.ReportEvent(self._svc_display_name_, + servicemanager.PYS_SERVICE_STARTED, + 0, # category + servicemanager.EVENTLOG_INFORMATION_TYPE, + (self._svc_name_, '')) + print("Begin: %s" % (self._svc_display_name_)) + + hostport = ':8888' + print('Starting py.execnet SocketServer on %s' % hostport) + serversock = socketserver.bind_and_listen(hostport) + thread = threading.Thread(target=socketserver.startserver, + args=(serversock,), + kwargs={'loop':True}) + thread.setDaemon(True) + thread.start() + + # wait to be stopped or self.WAIT_TIME to pass + while True: + result = win32event.WaitForSingleObject(self.hWaitStop, + self.WAIT_TIME) + if result == win32event.WAIT_OBJECT_0: + break + + # write a 'stopped' event to the event log. + win32evtlogutil.ReportEvent(self._svc_display_name_, + servicemanager.PYS_SERVICE_STOPPED, + 0, # category + servicemanager.EVENTLOG_INFORMATION_TYPE, + (self._svc_name_, '')) + print("End: %s" % appname) + + +if __name__ == '__main__': + # Note that this code will not be run in the 'frozen' exe-file!!! + win32serviceutil.HandleCommandLine(SocketServerService) diff --git a/src/main/resources/PythonLibs/site-packages/execnet/script/xx.py b/src/main/resources/PythonLibs/site-packages/execnet/script/xx.py new file mode 100644 index 0000000000000000000000000000000000000000..931e4b7fad136a633a06b291267ef3161d0b9641 --- /dev/null +++ b/src/main/resources/PythonLibs/site-packages/execnet/script/xx.py @@ -0,0 +1,9 @@ +import rlcompleter2 +rlcompleter2.setup() + +import register, sys +try: + hostport = sys.argv[1] +except: + hostport = ':8888' +gw = register.ServerGateway(hostport) diff --git a/src/main/resources/PythonLibs/site-packages/execnet/xspec.py b/src/main/resources/PythonLibs/site-packages/execnet/xspec.py new file mode 100644 index 0000000000000000000000000000000000000000..c72f5b650f81e3e3e8e9059cd49ac6c51f3cf20a --- /dev/null +++ b/src/main/resources/PythonLibs/site-packages/execnet/xspec.py @@ -0,0 +1,53 @@ +""" +(c) 2008-2013, holger krekel +""" +class XSpec: + """ Execution Specification: key1=value1//key2=value2 ... + * keys need to be unique within the specification scope + * neither key nor value are allowed to contain "//" + * keys are not allowed to contain "=" + * keys are not allowed to start with underscore + * if no "=value" is given, assume a boolean True value + """ + # XXX allow customization, for only allow specific key names + popen = ssh = socket = python = chdir = nice = \ + dont_write_bytecode = execmodel = None + + def __init__(self, string): + self._spec = string + self.env = {} + for keyvalue in string.split("//"): + i = keyvalue.find("=") + if i == -1: + key, value = keyvalue, True + else: + key, value = keyvalue[:i], keyvalue[i+1:] + if key[0] == "_": + raise AttributeError("%r not a valid XSpec key" % key) + if key in self.__dict__: + raise ValueError("duplicate key: %r in %r" %(key, string)) + if key.startswith("env:"): + self.env[key[4:]] = value + else: + setattr(self, key, value) + + def __getattr__(self, name): + if name[0] == "_": + raise AttributeError(name) + return None + + def __repr__(self): + return "<XSpec %r>" %(self._spec,) + def __str__(self): + return self._spec + + def __hash__(self): + return hash(self._spec) + def __eq__(self, other): + return self._spec == getattr(other, '_spec', None) + def __ne__(self, other): + return self._spec != getattr(other, '_spec', None) + + def _samefilesystem(self): + return bool(self.popen and not self.chdir) + diff --git a/src/main/resources/PythonLibs/site-packages/icyexecnetgateway.py b/src/main/resources/PythonLibs/site-packages/icyexecnetgateway.py new file mode 100644 index 0000000000000000000000000000000000000000..6b9eba2dd167d7c8c63de2f5aecf056a81e79343 --- /dev/null +++ b/src/main/resources/PythonLibs/site-packages/icyexecnetgateway.py @@ -0,0 +1,194 @@ +import sys # for standard streams manipulation, byteorder +import Icy_inspect # used for module transmission + +import execnet # where all the hard work is done + +import numpyexecnet # will be sent to the remote interpreter + +from icy.type import DataType + +# register remote stdout and stderr channels, +# so that prints and errors are printed in the Icy console +def setup_stdstreams(gateway): + # send stdout and stderr channels to Icy, + # so that prints and errors are printed in the Icy console + channel = gateway.remote_exec(""" + import sys + + outchan = channel.gateway.newchannel() + sys.stdout = outchan.makefile("w") + channel.send(outchan) + + errchan = channel.gateway.newchannel() + sys.stderr = errchan.makefile("w") + channel.send(errchan) + """) + + # receive remote stdout and stderr as channels + outchan = channel.receive() + errchan = channel.receive() + + # associate the channels with printing callbacks + # note: callbacks execute in receiver thread + outchan.setcallback(lambda data: sys.stdout.write(str(data))) + errchan.setcallback(lambda data: sys.stderr.write(str(data))) + + # wait for the remote script to be finished + channel.waitclose() + + +# returns a Python object containing all the data needed to reconstruct a 2D image +# on the remote as a Numpy array +def pack_image(image): + imageData = image.getDataXY(0) + typecode = imageData.typecode + + # handle the fact that unsigned bytes do not exist natively in java + if typecode == 'b' and image.getDataType_() == DataType.UBYTE: + typecode = 'B' + + return [image.getSizeY(), + image.getSizeX(), + typecode, + sys.byteorder, + imageData.tostring()] + + +# returns an IcyBufferedImage built from a 2D Numpy array packaged on the remote side +def unpack_image(packed_image): + from array import array + from icy.image import IcyBufferedImage + + [rows, cols, typecode, byteorder, data] = packed_image + + a = array(typecode, data) + + # handle byteorder conversion + if sys.byteorder <> byteorder: + a.byteswap() + + # handle signedness + if typecode in ['B', 'H', 'I', 'L']: + signed = False + else: + signed = True + + return IcyBufferedImage(cols, rows, a, signed) + + +class IcyExecnetGateway(): + def __init__(self, python_path=None, gateway_spec=None, debug=False): + self.debug = debug + + # use gateway_spec if it is provided + if gateway_spec == None: + # if gateway_spec is not provided, use popen + # if python_path is provided, try to launch that interpreter + # or launch the python that is the system path if python_path is not provided + if python_path <> None: + gateway_spec = "popen//python=" + python_path + else: + gateway_spec = "popen//python=python" + + self.gateway = execnet.makegateway(gateway_spec) + + self.channel = None + + self.setup_stdstreams() + + # send the numpyexecnet module so that it can be imported + # on the remote + self.send_module(numpyexecnet) + + # define __enter__ and __exit__ methods so that the class can be + # used with the 'with' statement + def __enter__(self): + return self + + # Note: this waits for the previous remote execution to be finished + # make sure the remote script properly terminates at some point + def __exit__(self, type, value, traceback): + if type==None and value==None and traceback==None: + # This is the normal end of the 'with'-block, without any exception in-between. + # Wait for the remote to properly end. + self.cleanup() + else: + # An exception occurred on the local side in the 'with'-block. + # Waiting for the remote to end will likely deadlock, + # since the remote itself is properly waiting for data from the local side. + # So do a forced exit of the remote. + self.cleanup(force=True) + + # Note: this waits for the previous remote execution to be finished + # make sure the remote script properly terminates at some point + def cleanup(self, force=False): + if self.channel <> None and not force: + self.channel.waitclose() # make sure the remote process has finished + self.gateway.exit() # close the remote interpreter + execnet.default_group.terminate(timeout=1.) # robust termination + + if self.debug: + print "cleaned up" + + # register remote stdout and stderr channels, + # so that prints and errors are printed in the Icy console + def setup_stdstreams(self): + setup_stdstreams(self.gateway) + + # Note: this waits for the previous remote execution to be finished + # make sure the remote script properly terminates at some point + def remote_exec(self, args): + # wait for the previous channel to be closed + # Note: this prevents simultaneous remote_exec() run + if self.channel <> None: + self.channel.waitclose() + self.channel = self.gateway.remote_exec(args) + + # send some data over the channel + def send(self, args): + self.channel.send(args) + + # receive some data over the channel + def receive(self): + return self.channel.receive() + + # send a whole Python module over the channel + def send_module(self, module): + source = Icy_inspect.getsource(module) + + self.remote_exec(""" + import imp + import sys + + name = channel.receive() + source = channel.receive() + module = imp.new_module(name) + exec source in module.__dict__ + sys.modules[name] = module + """) + + self.send(module.__name__) + self.send(source) + + +def test(): + print "Testing the IcyExecnetGateway class" + + print "Create a gateway, print some text, verify if numpyexecnet is available" + with IcyExecnetGateway() as gateway: + #with IcyExecnetGateway(python_path = "/usr/local/bin/python") as gateway: + #with IcyExecnetGateway(python_path = "/Users/tlecomte/VirtualEnvs/ENV/bin/python") as gateway: + #with IcyExecnetGateway(gateway_spec = "popen//python=/usr/local/bin/python") as gateway: + gateway.remote_exec("""print " Hello from child cpython" """) + + gateway.remote_exec(""" + import numpyexecnet + print numpyexecnet.unpack_image + print numpyexecnet.pack_image + import numpy + print "Numpy version on the remote interpreter is", numpy.__version__ + """) + + print "done" + + diff --git a/src/main/resources/PythonLibs/site-packages/numpyexecnet.py b/src/main/resources/PythonLibs/site-packages/numpyexecnet.py new file mode 100644 index 0000000000000000000000000000000000000000..5b99db7fccea8c098dd23bdd5457825e66b6f728 --- /dev/null +++ b/src/main/resources/PythonLibs/site-packages/numpyexecnet.py @@ -0,0 +1,64 @@ +from array import array +import sys + + +# pack a 2D Numpy array in a list that can be sent to a remote +def pack_image(a): + # we do not import numpy globally so that this module can be loaded + # and sent from Icy without errors, where numpy is not available + import numpy as np + + # Not all the Numpy datatypes will be understood. + # Not even all the basic numeric ones, since Icy color model is based on + # java.awt.image.BandedSampleModel, which only supports + # TYPE_BYTE, TYPE_USHORT, TYPE_SHORT, TYPE_INT, TYPE_FLOAT, and TYPE_DOUBLE. + + if a.ndim <> 2: + raise ValueError("The array must be 2D.") + + if a.dtype == np.uint8: + typecode = 'B' + elif a.dtype == np.int8: + typecode = 'b' + elif a.dtype == np.uint16: + typecode = 'H' + elif a.dtype == np.int16: + typecode = 'h' + elif a.dtype == np.int32: + typecode = 'i' + elif a.dtype == np.float32: + typecode = 'f' + elif a.dtype == np.float64: + typecode = 'd' + else: + raise TypeError("Array dtype (%s) is not supported. It must be one of uint8, int8,\n"\ + " uint16, int16, int32, float32, float64." %(a.dtype)) + + return [a.shape[0], + a.shape[1], + typecode, + sys.byteorder, + a.tostring()] + + +# unpack image data from a remote to a 2D Numpy array +def unpack_image(packed_image): + # we do not import numpy globally so that this module can be loaded + # and sent from Icy without errors, where numpy is not available + import numpy as np + + [rows, cols, typecode, byteorder, dataString] = packed_image + + # construct the Numpy data type out of the endianness and array typecode + if byteorder == 'big': + dtype = '>' + else: + dtype = '<' + dtype += typecode + + # create a 2d Numpy array out of the data + # Note that we make a copy because frombuffer returns a read-only array + a = np.frombuffer(dataString, dtype=dtype).copy() + a.shape = (rows, cols) + + return a diff --git a/src/main/resources/PythonLibs/site-packages/test_icyexecnetgateway.py b/src/main/resources/PythonLibs/site-packages/test_icyexecnetgateway.py new file mode 100644 index 0000000000000000000000000000000000000000..1243df7f221e2963dae2fa96077b9862b97a41f2 --- /dev/null +++ b/src/main/resources/PythonLibs/site-packages/test_icyexecnetgateway.py @@ -0,0 +1,3 @@ +import icyexecnetgateway as icygw + +icygw.test()