Generated on Wed, 07 Feb 2007 04:01:28 CET
There are currently 720 XXX/TODO comments.
# TODO: The Zope 3 checkin mailing list uses a non-standard format.
# This test checks the compatibility hacks planted in the code.
# I hope that they will be removed in the future.
svn_msg4 = open_test_data("svn_msg4.txt")
if data.has_key('data'): # TODO should we bark if no data is given?
msg_raw = data['data']
parser = zope.component.getUtility(IMessageParser)
if msg_raw.startswith("From "):
# TODO: should handle RFC-2047
dc.title = unicode(message.subject)
dc.created = message.date
except DuplicationError:
# TODO This method is rather bloated and hard to understand.
text = self.context.body.replace('\r', '') \
.replace('&', '&') \
.replace('<', '<') \
# TODO: find out the actual encoding instead of assuming UTF-8
return unicode(text, 'UTF-8', 'replace')
def mark_whitespace(self, line, tab=('>', '-'), trail='.'):
# XXX why aren't modules pickleable? # import os # from xml import sax
# XXX somehow objects are getting registered here, but not
# modified. need to figure out what is going wrong, but for
# now just abort the transaction.
##assert not cn._registered
# TODO
# test classes with getstate and setstate
# make sure class invalidation works correctly
# XXX Hack: Use _p_newstate for persistent classes, because
# a persistent class's persistent state is a fairly limited
# subset of the dict and we really want to replace everything.
if hasattr(self._replace, "_p_newstate"):
# XXX Is this safe in all cases?
oid = getattr(obj, "_p_oid", marker)
if oid is marker:
return None
# XXX Will this object always have an __module__?
if obj.__module__ == self._module_name:
# Save an external type registered through registerWrapper
objtype = type(obj)
# XXX should we try to handle descriptors? If it looks like a
# descriptor, try calling it and passing the class object?
classTypes = {
# XXX There is a lot of magic here to give classes and instances # separate sets of attributes. This code should be documented, as it # it quite delicate, and it should be move to a separate module.
# XXX _p_invalidate
# Create a descriptor that calls _p_activate() when _p_jar is set.
inst_jar_descr = findattr(cls, "_p_jar", None)
# XXX I'm not sure I understand this code any more.
super_meth = super(PersistentClassMetaClass, cls).__getattribute__
# If we are initializing the class, don't trying to check variables
# XXX There needs to be an _p_changed flag so that classes get
# registered with the txn when they are modified.
def __setattr__(cls, attr, val):
# XXX what should happen with these?
return
super(PersistentClassMetaClass, cls).__delattr__(attr)
# XXX The following test isn't right because overriding
# must be allowed, but I haven't figured that out yet.
# __getstate__ and __setstate__ might be overridden
# __implements__ might be overridden
# XXX Should the object get marked as a ghost when it is, in fact,
# not a ghost? The most obvious answer is no. But if we don't
# then we need some other attribute that can be used to handle
# invalidations of classes and make _p_activate() work as expected.
# XXX Make sure the object is in the cache before
# calling setstate().
dm._cache[cls._p_oid] = cls
dm.setstate(cls)
# XXX Should really put in special inconsistent state
cls._p_state = UPTODATE
else:
print id(cls), "dm", dm, "oid", cls._p_oid
# XXX This should really be _p_getnewargs() or something like that.
# If the class is later loaded and unghostified, the arguments
# passed to __new__() won't have an __module__. It seems that
# XXX This doesn't handle __methods__ correctly.
# ExtClassDataDescrs. As a hack, I'm deleting _p_oid
# and _p_jar from the keys dict, because I know they
# will be descrs and they won't change as a result of
# XXX What if the function module is deactivated while the
# function is executing? It seems like we need to expose
# refcounts at the Python level to guarantee that this will
# work.
# XXX Is this sufficient?
if self._v_side_effect:
self._pf_module._p_changed = True
# XXX new.function doesn't accept a closure
func = self._pf_func
func_state = func.func_defaults, func.func_dict or None
# XXX Is this comment still relevant? # # There seems to be something seriously wrong with a module pickle # that contains objects pickled via save_global(). These objects are
# XXX need getattr &c. hooks to update _p_changed?
def __getstate__(self):
d = self.__dict__.copy()
# XXX Is it okay that these packages don't have __path__?
# A PersistentPackage can exist in a registry without a manager.
# It only gets a manager if someone creates an __init__ module for
# XXX need to be able to replace sys.std{in,out,err} at this point
exec source in moddict
del moddict[__persistent_module_registry__]
new_names = NameFinder(module)
replacements = new_names.replacements(old_names)
# XXX should raise a better error
raise ValueError(name)
self._p_changed = True
self._modules[name] = module
# TODO: Find timezones using locale information
pass
# Create a date/time object from the data
# TODO: Need to make sure unicode is everywhere
return unicode(text)
# TODO: works for gregorian only right now
for entry in _findFormattingCharacterInPattern('G', pattern):
info[entry] = r'(%s|%s)' %(calendar.eras[1][1], calendar.eras[2][1])
# TODO: works for gregorian only right now
for entry in _findFormattingCharacterInPattern('G', pattern):
info[entry] = calendar.eras[2][1]
# TODO: Implement this method
return False
def getFirstWeekDayName(self):
# TODO: What about keys??? Where do I get this info from?
pieces = [key+'='+type for key, type in ()]
if pieces:
id_string += '@' + ','.join(pieces)
# TODO: The parser does not support timezones yet.
self.assertEqual(self.format.parse(
'2. Januar 2003 21:48:01 +100',
'd. MMMM yyyy HH:mm:ss z'),
# TODO: The parser does not support timezones yet.
self.assertEqual(self.format.parse(
'Donnerstag, 2. Januar 2003 21:48 Uhr +100',
"EEEE, d. MMMM yyyy H:mm' Uhr 'z"),
# TODO: I'd like for there to be a symmetric interface method, one in
# which an adapter is gotten for both the first arg and the second
# arg. I.e. getLanguage(obj, env)
# But this isn't a good match for the ITranslationService.translate()
# TODO: get global statements working # The compiler module, which we now rely on, doesn't handle global # statements correctly. ## def test_global_variable_assignment(self):
# TODO: There are no tests for this branch
code.exec_(globals,
{}, # we don't want to get local assignments saved.
)
# TODO: The 'user' in the transactions is a concatenation
# of 'path' and 'user' (id). 'path' used to be the path
# of the user folder in Zope 2. ZopePublication currently
# does not set a path, so ZODB lets the path default to
# TODO: This is because of ZODB3/Zope2 cruft regarding
# the user path (see comment above). This 'if' block
# should go away.
split = user_name.split()
# TODO: Does not seem to be tested
def renderer(_context, sourceType, for_, factory):
def dottify(obj):
try:
#TODO the way this message id is defined, it won't be picked up by
# i18nextract and never show up in message catalogs
return _(byte_size + ' ${width}x${height}', mapping=mapping)
# TODO This dependency is bad. It reflects use of a utility function # that should be copied elsewhere: zope.server
# TODO: Currently, the path is the ApplicationURL. This is reasonable,
# and will be adequate for most purposes.
# A better path to use would be that of the folder that contains
# the site manager this service is registered within. However,
# TODO: When scheduler exists, sweeping should be done by
# a scheduled job since we are currently busy handling a
# request and may end up doing simultaneous sweeps
# TODO: Ahh, the code from DT_SQLVar is duplicated here!!!
if t == 'int':
try:
if isinstance(v, StringTypes):
# TODO: Shrug, should these tyoes be really hard coded? What about
# Dates and other types a DB supports; I think we should make this
# a plugin.
if t == 'int':
# TODO base64 decoding should be done here, not in request
lpw = request._authUserPW()
if lpw is None:
login, password = None, None
# TODO This is only needed when self.evaluateInlineCode is true,
# so should only be needed for zope.app.pythonpage.
from zope.app.interpreter.interfaces import IInterpreter
interpreter = component.queryUtility(IInterpreter, lang)
# TODO:
# This is temporary till we find a better name to use
# objects that are stored in annotations
# Steve suggested a ++annotations++ Namespace for that.
# TODO: # - Specify all attributes as schema fields where possible # - Define necessary methods for interfaces
# TODO:
# Process instances need to have a place, so they can look things
# up. It's not clear to me (Jim) what place they should have.
# TODO: Tests missing for: # State Class/Interface # Transition Class/Interface
# TODO: Temporary ...
class StateAddFormHelper(object):
# Hack to prevent from displaying an empty addform
def __call__(self, template_usage=u'', *args, **kw):
# TODO: This could and should be handled by a Vocabulary Field/Widget
def getStateNames(self):
pd = self.context.getProcessDefinition()
states = removeAllProxies(pd.getStateNames())
# TODO: This method assumes you have a publishing workflow
def listPublishedItems(self):
return self.filterByState(self.listContentInfo(), 'published')
# TODO: Implementation needs more work !!
# check if xml-data can be imported and represents a StatefulPD
checker = XMLFormatChecker()
parseString(data, checker)
# TODO: Should at least have a title, if not a value as well
class IStatefulStatesContainer(IProcessDefinitionElementContainer):
"""Container that stores States."""
# TODO This needs to be discussed:
# how can we know if this ProcessInstance is annotated
# to a Content-Object and provide secure ***READONLY***
# Access to it for evaluating Transition Conditions ???
# TODO: How can i make sure that nobody modifies content
# while the condition scripts/conditions are evaluated ????
# this hack only prevents from directly setting an attribute
# using a setter-method directly is not protected :((
# TODO: Argh. Why don't we have a unioned Interface for that?!?
dir_write = IWriteDirectory(container)
dir_read = IReadDirectory(container)
# TODO: Need to add support for large files
data = body.read()
newfile = factory(name, request.getHeader('content-type', ''), data)
# TODO: This could be cleaned up by providing special target
# interfaces for HTTP methods. This way we can even list verbs that
# are not in the lists above.
for m in _allowed_methods:
# TODO: Most of the time, this is a lie. We not fully support
# DAV 2 on all objects, so probably an interface check is needed.
self.request.response.setHeader('DAV', '1,2', literal=True)
# UGLY! Some clients rely on this. eg: MacOS X
# TODO: Once we have lock, add the lock token here
r.append('')
# TODO: fix it, functional test doesn't like zapi.getPath? ri
pass
return False
# TODO: we need some unittests for this !!!
def __getitem__(self, name):
return self.index.macros[name]
# TODO: It is hard-coded that the connection name variable is called
# 'sql_conn'. We should find a better solution.
conn_name = econtext.vars['sql_conn']
adapter = queryUtility(IZopeDatabaseAdapter, conn_name)
# TODO: See issue http://www.zope.org/Collectors/Zope3-dev/504, ri
elif factory.__module__ is not None and \
factory.__module__.startswith(JSONRPC_DIRECTIVES_MODULE):
# TODO: Once we support passive display of instances, this insanity can go
# away.
if not isinstance(component, (types.MethodType, types.FunctionType,
types.ClassType, types.TypeType,
# TODO: List hard-coded for now. IGNORE_MODULES = ['twisted'] import sys import zope.app.zcmlfiles # Ignore these files, since they are not necessary or cannot be imported
# TODO: List hard-coded for now. IGNORE_MODULES = ['twisted'] import sys
# TODO: Once we support passive display of instances, this insanity can go
# away.
if not isinstance(component, (types.MethodType, types.FunctionType,
types.ClassType, types.TypeType,
# TODO: See issue http://www.zope.org/Collectors/Zope3-dev/504, ri
elif factory.__module__ is not None and \
factory.__module__.startswith(JSONRPC_DIRECTIVES_MODULE):
# TODO: test stateful tree view class CookieTreeViewTest(StatefulTreeViewTest):
# TODO: some methods from the boostap modue are not tested # class TestBootstrapSubscriber(PlacefulSetup, unittest.TestCase):
# TODO: check EventSub
root_folder = self.root_folder
for i in range(2):
cx = self.db.open()
# TODO: We need to resolve code duplication in zpasswd.py and mkzopeinstance.py
def main(argv=None):
"""Top-level script function to create a new principals."""
# TODO we should be able to compute the script
script = os.path.abspath(sys.argv[0])
zope_home = os.path.dirname(os.path.dirname(script))
zope_init = os.path.dirname(os.path.abspath(zope.app.__file__))
# TODO: Figure out what these should be. ThreadedAsync ZConfig zdaemon
# TODO: Graceful shutdown does not work yet.
# This will work for servers started directly and by zdaemon. Passing
# an exit status of 0 causes zdaemon to not restart the process.
# TODO: Graceful restart does not work yet.
# Passing an exit status of 1 causes zdaemon to restart the process.
LoopCallback.exit_status = 1
# TODO: We need to resolve code duplication in zpasswd.py and mkzopeinstance.py
def main(argv=None):
"""Top-level script function to create a new principals."""
# XXX: the renderer *expects* unicode as input encoding (ajung)
renderer = ReStructuredTextToHTMLRenderer(
unicode(info), self.request)
info = renderer.render()
# TODO: There needs to be a better api for this
return self.request.publication.db
def evolve(self):
# XXX: the renderer *expects* unicode as input encoding (ajung)
renderer = ReStructuredTextToHTMLRenderer(
unicode(info), self.request)
info = renderer.render()
# TODO: If the factory wrapped by LocationProxy is already a Proxy,
# then ProxyFactory does not do the right thing and the
# original's checker info gets lost. No factory that was
# registered via ZCML and was used via addMenuItem worked
# TODO Can't we add a 'copy_of' or something as a prefix
# instead of raising an exception ?
raise UserError(
_("The given name(s) %s is / are already being used" %(
# TODO: It appears that BTreeContainer uses SampleContainer only to
# get the implementation of __setitem__(). All the other methods
# provided by that base class are just slower replacements for
# operations on the BTree itself. It would probably be clearer to
# TODO: needs tests
def getDefaultViewName(object, request, context=None):
name = queryDefaultViewName(object, request, context=context)
if name is not None:
# TODO: new __name__ attribute must be tested
if class_:
if attribute != '__call__':
if not hasattr(class_, attribute):
# TODO: look up a real tzinfo object using 'tzname'
#
# Option 1: Use 'timezones' module as global registry::
#
# TODO: for some reason changing this code to use implements() does not # work. This is due to a bug that is supposed to be fixed after X3.0. code = """\ from zope.interface import Interface
# XXX fix ZODB
class FileStorage(ZODB.FileStorage.FileStorage):
def new_oid(self):
# TODO: tests for other directives needed
atre = re.compile(' at [0-9a-fA-Fx]+')
# XXX fix ZODB
class FileStorage(ZODB.FileStorage.FileStorage):
def new_oid(self):
# XXX we need to figure out how to constrain this or, alternatively,
# just use regular folders, which is probably the beter choice.
# zope.app.container.constraints.containers(ILocalSiteManager)
# TODO: for now until we figure out a way to specify the factory directly
factoryName = schema.TextLine(
title=_(u"Factory Name"),
readonly=False,
# TODO: applyChanges isn't reporting "change" correctly (we're
# re-generating the sequence with every edit, and need to be smarter)
def applyChanges(self, content):
field = self.context
# TODO: ObjectCreatedEvent here would be nice
value = self.factory()
# apply sub changes, see if there *are* any changes
changes = applyWidgetsChanges(self, field.schema, target=value,
names=self.names)
# TODO: If value implements ILocation, set name to field name and
# parent to content
return changes
kwargs['maxlength'] = self.displayMaxWidth # TODO This is untested.
return renderElement(self.tag, **kwargs)
# TODO: Currently datetimes return in local (server)
# time zone if no time zone information was given.
# Maybe offset-naive datetimes should be returned in
# this case? (DV)
#XXX: _getFormValue() should return a string value that can be
# used in a HTML form, but it doesn't. When
# http://www.zope.org/Collectors/Zope3-dev/584 gets fixed
# this hack should be reverted.
# TODO This code path is untested.
return zapi.getMultiAdapter((self._error, self.request),
IWidgetInputErrorView).snippet()
return ""
# TODO This code path is untested.
raise zope.app.form.interfaces.MissingInputError(
field.__name__, self.label,
)
# TODO This code path is untested.
err = zope.schema.interfaces.ValidationError(
"Invalid value id", token)
raise WidgetInputError(field.__name__, self.label, err)
# TODO This code path is untested.
self._error = WidgetInputError(field.__name__, self.label, err)
raise self._error
# TODO This code path is untested.
self._error = WidgetInputError(field.__name__, self.label, err)
raise self._error
# TODO this duck typing is to get the code working. See
# collector issue 372
if isinstance(self.errors, basestring):
return self.errors
# TODO Extra and Annotations is not tested directly
# TODO need to make sure the top-level name doesn't already
# exist, or existing site data can get screwed
self.call_committer()
return ""
# TODO: making a SOAP request without configuring a SOAP request
# currently generates an XMLRPC request. Not sure what the right thing
# is, but that doesn't seem to be the right thing.
soaprequestfactory = DummyRequestFactory()
# TODO: making a SOAP request without configuring a SOAP request
# currently generates an XMLRPC request. Not sure what the right thing
# is, but that doesn't seem to be the right thing.
soaprequestfactory = DummyRequestFactory()
# TODO: should SOAP, XMLRPC, and Browser extend this?
"""generic HTTP request factory"""
class IXMLRPCRequestFactory(IRequestFactory):
# TODO: register utility as adapter for IAnnotations on utility activation. from persistent import Persistent from persistent.dict import PersistentDict
""" # TODO move this out to a doctest file when we have one...
utility = component.getUtility(IPrincipalAnnotationUtility)
return utility.getAnnotations(principal)
component.adapter(zope.security.interfaces.IPrincipal)(annotations)
# TODO this assumes a checkout directory structure
site_zcml = os.path.join(dirname(dirname(dirname(zope.__file__))),
"site.zcml")
context = config(site_zcml, features=("devmode",), execute=False)
# TODO There ought to be some way to do this more cleanly.
# This fills the __dict__ of the old object with new state.
# The other way to achieve the desired effect is to replace
# the object in its container, but this method preserves the
# TODO obviously no test for this
if (zope.location.ILocation.providedBy(ob)
and not zope.location.inside(ob, object)):
return '1' # go away
# TODO: This *should* be a datetime, but we don't yet know how it's used.
timestamp = zope.schema.Float(
description=_("time value indicating the"
" when the bookkeeping information was created"),
# TODO: This *should* be an ASCIILine, but there isn't one (yet).
history_id = zope.schema.ASCII(
description=_("""
Id of the version history related to the version controlled resource.
# TODO: This *should* be an ASCIILine, but there isn't one (yet).
version_id = zope.schema.ASCII(
description=_(
"version id that the version controlled resource is based upon"))
# TODO: figure out if we can get rid of this IVersionedContainer = INonVersionedData
# TODO: should be ASCIILine
label = zope.schema.ASCII(
title=_("Label"),
description=_("Label applied to the version."),
# TODO: should be ASCIILine
branch_id = zope.schema.ASCII(
title=_("Branch Id"),
description=_("Identifier for the new branch."),
# TODO: I think this should be moved to the functional test.
self.context.register(zapi.traverse(self.context, "/"))
self.context.register(zapi.traverse(self.context, "/++etc++site"))
self.request.response.redirect('index.html')
# TODO: Temporary fix, which Steve should undo. URL is
# just too HTTPRequest-specific.
if hasattr(request, 'URL'):
url = request.URL
# TODO: Temporary fix, which Steve should undo. URL is
# just too HTTPRequest-specific.
if hasattr(request, 'URL'):
url = request.URL
# TODO: The password manager name may be changed only
# if the password changed
readonly=True
)
_cookies = None
def __init__(self, cookies=None):
# TODO: expand the IHTTPResponse interface to give access to all cookies
_cookies = None
def __init__(self, cookies=None):
# TODO: extend the IHTTPRequest interface to allow access to all
# cookies
for k,v in response._cookies.items():
k = k.encode('utf8')
self.cookies[k] = v['value'].encode('utf8')
#TODO variable insertions must not be expanded
# until after the translation... preferably use
# mapping here
self.error = _('Invalid regex: %s') % pattern
# TODO: what if we have multiple participations?
principal = getInteraction().participations[0].principal
ann = utility.getAnnotations(principal)
# TODO: This TerminateProcess call doesn't make much sense: it's
# doing a hard kill _first_, never giving the process a chance to
# shut down cleanly. Compare to current Zope2 service code, which
# uses Windows events to give the process a chance to shut down
>>> browser.mech_browser.close() # TODO: Browser needs close.
>>> inst1.stop()
>>> inst2.stop()
>>> ZEO.tests.forker.shutdown_zeo_server(('localhost', zeo_port))
# TODO: Make sure this is only called if we are running via zdaemon.
# Setting the module global variable in the main module signals zdaemon to restart
main.should_restart = True
reactor.callLater(time, reactor.stop)
# TODO might be outsourced to some utility
for registration in registrations:
for required_iface in registration.required:
if (required_iface is IXMLRPCRequest and
# TODO might be outsourced to some utility
results = []
for interface in interfaces:
registrations = list(getViews(interface, IXMLRPCRequest))
# TODO if defaults are given, render their type
return [['undef'] * (self._getFunctionArgumentSize(func) + 1)]
def _getFunctionHelp(self, func):
# TODO might be outsourced to some utility
for registration in registrations:
for required_iface in registration.required:
if (required_iface is IXMLRPCRequest and
# TODO might be outsourced to some utility
results = []
for interface in interfaces:
registrations = list(getViews(interface, IXMLRPCRequest))
# TODO if defaults are given, render their type
return [['undef'] * (self._getFunctionArgumentSize(func) + 1)]
def _getFunctionHelp(self, func):
# TODO: The problem with this is that this is not valid XML and
# can't be parsed back!
"compact", "nowrap", "ismap", "declare", "noshade", "checked",
"disabled", "readonly", "multiple", "selected", "noresize",
# TODO: Code duplication is BAD, we need to fix it later
text = self.engine.evaluateText(stuff[0])
if text is not None:
if text is self.Default:
# TODO: Seems like this branch not used anymore, we
# need to remove it
# Evaluate the value to be associated with the variable in the
# TODO: I can't decide whether we want to cgi escape the translated
# string or not. OTOH not doing this could introduce a cross-site
# scripting vector by allowing translators to sneak JavaScript into
# translations. OTOH, for implicit interpolation values, we don't
# TODO: Code duplication is BAD, we need to fix it later
structure = self.engine.evaluateStructure(expr)
if structure is not None:
if structure is self.Default:
# TODO: We need to pass in one of context or target_language
return self.engine.translate(msgid, self.i18nContext.domain,
i18ndict, default=default)
# TODO: this should not catch ZODB.POSException.ConflictError.
# The ITALExpressionEngine interface should provide a way of
# getting the set of exception types that should not be
# handled.
# TODO: Why does this want to find ZODB ??? import os import sys
# TODO: Failing tests don't cause test.py to report an
# error; not sure why. ;-(
sys.exit(rc)
elif rc < 0:
# TODO: Deactivated test, since we have not found a solution for this and
# it is a deep and undocumented HTML parser issue.
# Fred is looking into this.
#suite.addTest(unittest.makeSuite(MacroFunkyErrorTest))
# TODO this code isn't picked up by the Zope 3 test framework..
def test_suite():
suite = unittest.TestSuite()
suite.addTest(test_htmltalparser.test_suite())
# TODO Should return None or a DOM tree
return self.evaluate(expr)
# implementation; can be overridden
# TODO: Should return a sequence
return self.evaluate(expr)
def evaluateMacro(self, macroName):
# TODO: We need to pass in one of context or target_language
return self.engine.translate(msgid, self.i18nContext.domain, i18ndict,
default=default, position=self.position)
# TODO: Does this always follow Python escape semantics?
l = eval(l)
if section == ID:
msgid += l
# TODO: You should not sort by msgid, but by filename and position. (SR)
msgids.sort()
for msgid in msgids:
positions = engine.catalog[msgid]
# TODO: Code duplication is BAD, we need to fix it later
key, expr = parseSubstitution(arg)
cexpr = self.compileExpression(expr)
program = self.popProgram()
# TODO: Ugly hack to work around tal:replace and i18n:translate issue.
# I (DV) need to cleanup the code later.
replaced = False
if "replace" in taldict:
# TODO: at some point we should insist on a non-trivial position
self.emit("setPosition", position)
if self.inMacroUse:
if fillSlot:
# TODO: should test all inplace operators...
P = self.new_proxy
pa = P(1)
# TODO: This doesn't work properly for protocol 2 pickles yet; that # still needs to be dealt with. Particular issues that need to be # addressed involve supporting the changes for new-style objects.
# TODO: use of a regular expression here seems to be brittle # due to fuzzy semantics of unicode "surrogates". # Eventually, we should probably rewrite this as a C # function.
# TODO: This doesn't do what I want anymore because we can't strip v
def _output_nonscalar(self, write, name, id, v, str_indent, indent):
write('%s<%s%s> ' % (str_indent, name, id))
v.output(write, indent+2)
# TODO: Waaa need more tests class TestBrowserResponse(TestCase):
# TODO: Eventially this will be a different method
def _authUserPW():
"""Return (login, password) if there are basic credentials;
return None if there aren't."""
# XXX We should pass the ``size`` argument to self.stream.readline
# but twisted.web2.wsgi.InputStream.readline() doesn't accept it.
# See http://www.zope.org/Collectors/Zope3-dev/535 and
# http://twistedmatrix.com/trac/ticket/1451
# TODO: HTTP redirects must provide an absolute location, see
# http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.30
# So, what if location is relative and base is unknown? Uncomment
# the following and you'll see that it actually happens.
# XXX using readlines() instead of lines()
# as twisted's BufferedStream sends back
# an empty stream here for read() (bug)
lines = ''.join(self._body_instream.readlines())
# TODO: This still needs to support comments in structured fields as # specified in RFC 2822.
# TODO: We should not allow for a None value type.
Set(value_type=None)
# do not allow arbitrary value types
# TODO: We should not allow for a None value type.
FrozenSet(value_type=None)
# do not allow arbitrary value types
# TODO: still need to test the default implementation
class SampleTerm(object):
pass
# TODO: We should not allow for a None valeu type.
List(value_type=None)
# do not allow arbitrary value types
# TODO: we should probably use a more general definition of newlines
return '\n' not in value
class ASCIILine(ASCII):
# TODO: we should probably use a more general definition of newlines
return '\n' not in value
class Float(Orderable, Field):
# TODO: There should be a more specialized field type for this.
token = Attribute(
"token",
"""Token which can be used to represent the value on a stream.
raise # TODO verify sanity of a pass on EINTR :-/
lines = process.stdout.readlines()
process.stdout.close()
for l in reversed(lines):
# TODO: We should be able to deal with this case, too
self.reporter("TODO: %s" % local)
# thing to do anyway
return
flag = self.metadata.getentry(local).get("flag")
# TODO: This should have a larger set of default patterns to
# ignore, and honor .cvsignore
fn = basename(path)
return (fn.endswith("~")
# TODO: More to add...
def test_suite():
s = unittest.TestSuite()
# TODO: This test will fail if a file /tmp/@@Zope/Root exists :-(
target = self.tempdir()
self.assertEqual(self.network.findrooturl(target), None)
# TODO: need test cases for anomalies, e.g. files missing or present
# in spite of metadata, or directories instead of files, etc.
def test_suite():
# TODO: This is a bug; we shouldn't depend on these packages at all. # Need to restructure. from zope.proxy import removeAllProxies #from zope.app.fssync import fspickle
# TODO: for now, remove all proxies.
ObjectEntryAdapter.__init__(self, removeAllProxies(context))
def getBody(self):
# TODO: There's no test case for this code!
nbase = normcase(name)
matches = [b for b in self.entries if normcase(b) == nbase]
if matches:
# TODO: This is platform dependent
if exists(original):
origfile = original
else:
# TODO: Don't change the case of the header names; httplib might
# not treat them in a properly case-insensitive manner.
assert self.rooturl
if not path.endswith("/"):
#XXX Chunking works only with the zserver. Twisted responds with
# HTTP error 400 (Bad Request); error document:
# Excess 4 bytes sent in chunk transfer mode
#We use a buffer as workaround and compute the Content-Length in
#XXX If chunking works again, replace the following lines with
# datasource(PretendStream(conn))
# conn.send("0\r\n\r\n")
tmp.seek(0)
# TODO: how to recurse?
self.dirrevert(target)
self.metadata.flush()
if os.path.isdir(target):
# TODO: This should be a configuration value as it takes a long time to
# compute on Mac OSX
return 100
sl = []
# TODO: This should be a configuration value as it takes a long time to
# compute on Mac OSX
return 100
# make a server socket
# TODO: This should be a configuration value as it takes a long time to
# compute on Mac OSX
return 100
sl = []
# TODO: please add unit tests for other methods as well:
# compute_timezone_for_log
# log_date_string
# log
# TODO tests should be more careful to clear the socket map.
poll(0.1)
self.orig_map_size = len(socket_map)
self.hook_asyncore_error()
# TODO: please add unit tests for other methods as well:
# compute_timezone_for_log
# log_date_string
# log
# TODO: support named-pipe syslog. # [see ftp://sunsite.unc.edu/pub/Linux/system/Daemons/syslog-fifo.tar.z] # from:
# TODO tests should be more careful to clear the socket map.
asyncore.poll(0.1)
self.orig_map_size = len(asyncore.socket_map)
self.hook_asyncore_error()
# TODO This test doesn't work. I think it is because execute()
# doesn't read the whole reply. The execeute() helper
# function should be fixed, but that's for another day.
result = status_messages['HELP_START'] + '\r\n'
# TODO: we should (optionally) verify that the
# ip number belongs to the client. [wu-ftpd does this?]
self.port_addr = (ip, port)
self.reply('SUCCESS_200', 'PORT')
# TODO: This should be a configuration value as it takes a long time to
# compute on Mac OSX
return 100
sl = []
# TODO: This should be a configuration value as it takes a long time to
# compute on Mac OSX
return 100
# make a server socket
# TODO: This should be a configuration value as it takes a long time to
# compute on Mac OSX
return 100
sl = []
# TODO: Some logging should go on here.
def handle_error_no_close(self):
# TODO: There's a circular import problem with the proxy package. # The proxy framework needs some refactoring, but not today. import zope.proxy
# TODO: There's a circular import problem with the proxy package. # The proxy framework needs some refactoring, but not today. import zope.proxy
# TODO: It's a bit scary above that we can pickle a proxy if access is # granted to __reduce__. We might want to bother to prevent this in # general and only allow it in this specific case.
# TODO: we really need formal proxy introspection
#if type(object) is Proxy:
# # Is this already a security proxy?
# TODO: should test all inplace operators...
P = self.c.proxy
pa = P(1)
# TODO: should test all inplace operators...
P = self.c.proxy
pa = P(1)
# TODO: write a test to see that
# Checker.check/check_setattr handle permission
# values that evaluate to False
# TODO: dir segfaults with a seg fault due to a bas tuple
# check in merge_class_dict in object.c. The assert macro
# seems to be doing the wrong think. Basically, if an object
# has bases, then bases is assumed to be a tuple.
# XXX:
# I hope so most internation zcml will use UTF-8 as encoding
# otherwise this code must be made more clever
src = u''.join([u" "+l.decode('utf-8') for l in lines])
# TODO: make a splitter more like the HTMLSplitter for TextIndex # signature is # Splitter(string, stop_words, encoding, # singlechar, indexnumbers, casefolding)
# TODO: make a splitter more like the HTMLSplitter for TextIndex # signature is # Splitter(string, stop_words, encoding, # singlechar, indexnumbers, casefolding)
# TODO: Using a vocabulary might be the better choice to store
# keywords since it would allow use to use integers instead of strings
self._rev_index = IOBTree()
self._num_docs = Length(0)
# TODO: Using a vocabulary might be the better choice to store
# keywords since it would allow use to use integers instead of strings
self._rev_index = IOBTree()
self._num_docs = Length(0)
# TODO: Shouldn't / stripping
# be in PageTemplate.__call__()?
text = meta_pattern.sub("", text)
else:
# TODO: This reflects a case that simply isn't handled by the
# sniffer; there are many, but it gets it right more often than
# before.
def donttest_sniffer_xml_simple(self):
# TODO: Should return None or a macro definition
return self.evaluate(expr)
evaluateMacro = evaluate
# TODO: Once we have n-ary adapters, use them.
if ITALESFunctionNamespace.providedBy(ob):
ob.setEngine(econtext)
else:
# TODO: Issue: 459, check this
# It doesn't make sense to exit here, check why we run into this
# and make sure this whan't happen in the future. I guess we need
# more 'elif' case where will handle other extensions too
# TODO: Issue: 459, why do we exit here? I think the dependency tool
# should report dependeny and not exit at all. It doesn't make sense
# to abort on any error because we should report as much as possible
# Perhaps we should add another script like 'findmissing' or
# TODO: Issue: 459, check changed constructor
def getAllCleanedDependencies(path, zcml=False, deps=[], paths=[],
packages=False):
"""Return a list of all cleaned dependencies in a path."""
# to do here
# zope and zope/app are too general to be considered.
# Because otherwise it would just pick up zope as a dependency, but
# nothing else. We need a way to detect packages.
# there's a t east me useing windows ;-)
if path.endswith('src/zope/') or path.endswith('src/zope/app/') or \
path.endswith('src\\zope\\') or path.endswith('src\\zope\\app\\'):
# TODO This needs to be part of the contract.
try:
compiled = self._v_compiled
except AttributeError:
But the form machinery can still find the correct button. # TODO: demo Dividing display of widget errors and invariant errors ------------------------------------------------------
# TODO Omitting the form prefix
# TODO need test
def get(self, name):
return self.__Widgets_widgets_dict__.get(name)
# TODO need test
def __add__(self, other):
widgets = self.__class__([], 0)
widgets.__Widgets_widgets_items__ = (
# TODO need test
def __add__(self, other):
return self.__class__(*(self.actions + other.actions))
# TODO need test for this
def availableActions(form, actions):
result = []
for action in actions:
# TODO also need to be able to show disabled actions
def availableActions(self):
return availableActions(self, self.actions)
# TODO need test
class NamedTemplatePathAdapter(object):
interface.implements(zope.traversing.interfaces.IPathAdapter)
# TODO: currently only implemented for select by ClientForm
title=u"Options",
description=u"""\
A list of possible display values for the control.""",
# TODO: currently only implemented for select by ClientForm
title=u"Value",
description=u"The value of the control, as rendered by the display",
default=None,
reason = real_response._reason # XXX add a getReason method
headers = real_response.getHeaders()
headers.sort()
# TODO: currently only implemented for select by ClientForm
title=u"Options",
description=u"""\
A list of possible display values for the control.""",
# TODO: currently only implemented for select by ClientForm
title=u"Value",
description=u"The value of the control, as rendered by the display",
default=None,
# TODO: We are going to need more magic to make classProvides work with odd # classes. This will work in the next iteration. For now, we'll use # a different mechanism.
# TODO: need old style __implements__ compatibility?
# Hm, there's an __implemented__, but it's not a spec. Must be
# an old-style declaration. Just compute a spec for it
return Declaration(*_normalizeargs((spec, )))
# TODO: need old style __implements__ compatibility?
if spec is not None:
# old-style __implemented__ = foo declaration
spec = (spec, ) # tuplefy, as it might be just an int
# XXX hack to fake out twisted's use of a private api. We need to get them
# to use the new registed method.
def get(self, _):
class XXXTwistedFakeOut:
# TODO: add invalidation when a provided interface changes, in case
# the interface's __iro__ has changed. This is unlikely enough that
# we'll take our chances for now.
self.__parent__ = object # TODO: see if we can automate this
def moveTo(self, target, new_name=None):
"""Move this object to the `target` given.
self.__parent__ = object # TODO: see if we can automate this
def copyTo(self, target, new_name=None):
"""Copy this object to the `target` given.
# TODO test that ++names++ and @@names work too
def testTraverseNameBadValue(self):
from zope.traversing.api import traverseName
# TODO:
# This is here now to allow us to get site managers from a
# separate namespace from the content. We add and etc
# namespace to allow us to handle misc objects. We'll apply
# TODO: lift dependency on zope.app
if (name in ('process', 'ApplicationController')
and IContainmentRoot.providedBy(ob)):
# import the application controller here to avoid circular
# TODO: I am not sure this is the best solution. What
# if we want to enable tracebacks when also trying to
# debug a different skin?
skin = zope.component.getUtility(IBrowserSkinType, 'Debug')
# TODO: minimize zope.app zope.app zope.component zope.i18n
#TODO this does not seem to be used anywhere. Remove it? --philiKON
class INamespaceHandler(Interface):
def __call__(name, object, request):
# TODO still need tests for errors, and createMapping() def test_decode_empty():
# TODO: need convert element to metadata element name
dcelem = validator = None
if name in dcterms.element_to_name:
dcelem = dcterms.element_to_name[name]
self.assertEquals(writer._fd._mode, 'w') # TODO or 'wb'?
print >> writer, 'fee',
writer.write(' fie')
writer.writelines([' foe', ' foo'])
# TODO: maybe log the Message-Id of the message sent
self.log.info("Mail from %s to %s sent.",
fromaddr, ", ".join(toaddrs))
# Blanket except because we don't want
# TODO: We will add ILargeReadFile and ILargeWriteFile to efficiently # handle large data. class IReadDirectory(IReadContainer):
# TODO: we will add additional interfaces for WebDAV and File-system
# synchronization.
# (State, Token) to a item to reduce
ToDo = {}
Error = None
# XXX is this code (through "continue") needed?
# throw away all the prose leading up to the last
# footnote definition in the prose, this is so we
# XXX this test fails and I didn't do it, so just commenting it out (JBY). # >>> import doctest # >>> doctest._unittest_reportflags == (REPORT_NDIFF | # ... REPORT_ONLY_FIRST_FAILURE)
# TODO: # - make tracebacks show where the footnote was referenced # - teach script_from_examples and testsource about INTERPRET_FOOTNOTES # - update comments (including docstring for testfile)
# XXX There are no tests for this logging behavior.
# It's not at all clear that the test runner should be doing this.
configure_logging()
# TODO: Perhaps eat the garbage here, so that the garbage isn't
# printed for every subsequent test.
# Did the test leave any new threads behind?
# TODO -- really should have test of this that uses symlinks
# this is hard on a number of levels ...
for dirpath, dirs, files in os.walk(dir):
dirs.sort()
# XXX solutions? #def simple_slice_before(): # x = y[:4]
# XXX ## def set_timeout(self, timeout): ## self._timeout = timeout ## def set_http_connection_cache(self, conn_cache):
## # XXX ATM, FTP has cache as part of handler; should it be separate?
## self._ftp_conn_cache = conn_cache
def set_handled_schemes(self, schemes):
# XXX could use Greg Stein's httpx for some of this instead?
# or httplib2??
def set_proxies(self, proxies):
"""Set a dictionary mapping URL scheme to proxy specification, or None.
response = _response # XXX move Browser._response into this class?
while n > 0 or response is None:
try:
request, response = self._history.pop()
## # XXX I don't seem to have an example of exactly socket.error being ## # raised, only socket.gaierror... ## # I don't want to start fixing these here, though, since this is a ## # subclass of OpenerDirector, and it would break old code. Even in
# XXX should this call self.error instead?
#self.error("unknown declaration: " + `data`)
self._tokenstack.append(Token("decl", data))
def handle_pi(self, data):
# XXX there can actually be multiple auth-schemes in a
# www-authenticate header. should probably be a lot more careful
# in parsing them to extract multiple alternatives
# XXX could be multiple headers
authreq = headers.get(authreq, None)
if authreq:
mo = AbstractBasicAuthHandler.rx.search(authreq)
# XXX The client does not inspect the Authentication-Info header
# in a successful response.
# a mock server that just generates a static set of challenges.
def __init__(self, passwd=None):
if passwd is None:
# XXX not implemented yet
if req.has_data():
entdig = self.get_entity_digest(req.get_data(), chal)
else:
# XXX selector: what about proxies and full urls
req.get_selector())
if qop == 'auth':
self.nonce_count += 1
# XXX handle auth-int.
pass
base = 'username="%s", realm="%s", nonce="%s", uri="%s", ' \
'response="%s"' % (user, realm, nonce, req.get_selector(),
# XXX MD5-sess
KD = lambda s, d: H("%s:%s" % (s, d))
return H, KD
# XXX not implemented yet
return None
## # XXXX miserable hack
## def urljoin(base, url):
## if url.startswith("?"):
## return base+url
# XXX encoding
return _is_html(ct_hdrs, url, allow_xhtml)
return is_html
# XXX use attr_encoding for ref'd doc if that doc does not provide
# one by other means
#attr_encoding = attrs.get("charset")
if not url:
# Probably an link or .
# For our purposes a link is something with a URL, so ignore
# XXX Probably want to forget about the state of the current
# request, although that might interact poorly with other
# handlers that also use handler-specific request attributes
new = self.redirect_request(newurl, req, fp, code, msg, headers)
# XXX would self.reset() work, instead of raising this exception?
class EndOfHeadError(Exception): pass
class AbstractHeadParser:
# only these elements are allowed in or before HEAD of document
except socket.error, err: # XXX what error?
raise URLError(err)
# Pick apart the HTTPResponse object to get the addinfourl
# XXX It might be better to extract the read buffering code
# out of socket._fileobject() and into a base class.
r.recv = r.read
# XXX
# This may well be wrong. Which RFC is HDN defined in, if any (for
# the purposes of RFC 2965)?
# For the current implementation, what about IPv6? Remember to look
# XXX
# What should it be if multiple matching Set-Cookie headers have
# different versions themselves?
# Answer: there is no answer; was supposed to be settled by
# XXX Strictly you're supposed to follow RFC 2616
# age-calculation rules. Remember that zero Max-Age is a
# is a request to discard (old and new) cookie, though.
k = "expires"
# XXX there's an extra bit of the timezone I'm ignoring here: is
# this the right thing to do?
yr, mon, day, hr, min, sec, tz, _ = m.groups()
else:
# XXX Andrew Dalke kindly sent me a similar class in response to my request on # comp.lang.python, which I then proceeded to lose. I wrote this class # instead, but I think he's released his code publicly since, could pinch the # tests from it, at least...
# XXX oops, wrapped file-like-object isn't valid, ignore it
return ''
self.__cache.seek(0, 2)
# XXXX um, 4. refuse to pickle unless .close()d. This is better,
# actually ("errors should never pass silently"). Pickling doesn't
# work anyway ATM, because of http://python.org/sf/1144636 so fix
# this later
# XXX names and comments are not great here import os, re, string, time, struct, logging if os.name == "nt":
# XXX is there other stuff in here? -- eg. comment, commentURL?
c = Cookie(0,
cookie["KEY"], cookie["VALUE"],
None, False,
# XXX why does self.handlers need to be sorted?
bisect.insort(self.handlers, handler)
handler.add_parent(self)
self._handler_index_valid = False
# XXX could be cleaned up
for lookup in [process_request, process_response]:
for scheme, handlers in lookup.iteritems():
lookup[scheme] = handlers
# XXX should we allow a Processor to change the URL scheme
# of the request?
request_processors = set(self.process_request.get(req_scheme, []))
request_processors.update(self._any_request)
# XXX http[s] protocols are special-cased
dict = self.handle_error['http'] # https is not different than http
proto = args[2] # YUCK!
meth_name = 'http_error_%s' % proto
'no': 'norsk', #XXX added by hand ( forget about nynorsk?)
'sl': 'slovenian',
'af': 'afrikaans',
'bg': 'bulgarian',
'de': 'ngerman', #XXX rather than german
# ngerman, naustrian, german, germanb, austrian
'el': 'greek',
'en': 'english',
# TODO merge clines
while 1:
try:
c_start = rowspans.pop()
# TODO: use mixins for different implementations.
# list environment for option-list. else tabularx
use_optionlist_for_option_list = 1
# list environment for docinfo. else tabularx
# TODO maybe use cite bibitems
if self._use_latex_citations:
self.context.append(len(self.body))
else:
# TODO insertion point of bibliography should none automatic.
if self._use_latex_citations and len(self._bibitems)>0:
widest_label = ""
for bi in self._bibitems:
pass # XXX complain here?
if not inline:
pre.append('\n')
post.append('\n')
# TODO longtable supports firsthead and lastfoot too.
self.body.extend(self.active_table.visit_thead())
def depart_thead(self, node):
# TODO: for admonition titles before the first section
# either specify every possible node or ... ?
elif isinstance(node.parent, nodes.sidebar) \
or isinstance(node.parent, nodes.admonition):
# XXX needs cleanup.
if (len(node) and (isinstance(node[-1], nodes.TextElement) or
isinstance(node[-1], nodes.Text)) and
node.parent.index(node) == len(node.parent) - 1):
# Todo: Add empty cells below rowspanning cell and issue
# warning instead of severe.
if node.hasattr('morecols'):
# The author got a headache trying to implement
# Todo: Same as above.
# The number of columns this entry spans (as a string).
colspan = int(node['morecols']) + 1
del node['morecols']
# XXX Not sure if this is the best approach for these. These
# tests make sure that the error reported by ZConfig for missing
# resources is handled in a consistent way. Since ZConfig uses
# urllib2.urlopen() for opening all resources, what we do is
# XXX This tries to save and restore the state of logging around
# the test. Somewhat surgical; there may be a better way.
name = None
# XXX The "url" attribute of the handler is misnamed; it
# really means just the selector portion of the URL.
self.assertEqual(handler.url, "/")
self.assertEqual(handler.level, logging.ERROR)
# XXX The "url" attribute of the handler is misnamed; it
# really means just the selector portion of the URL.
self.assertEqual(handler.url, "/log/")
self.assertEqual(handler.level, logging.NOTSET)
# XXX This should be replaced to use a local cache for remote
# resources. The policy needs to support both re-retrieve on
# change and provide the cached resource when the remote
# resource is not accessible.
# XXX This assumes that one-character scheme identifiers
# are always Windows drive letters; I don't know of any
# one-character scheme identifiers.
scheme, rest = urllib.splittype(s)
# TODO: Need to decide how __setattr__ and __delattr__ should work,
# then write tests.
# TODO: It's expensive trying to load dead objects from the database.
# It would be helpful if the data manager/connection cached these.
def __init__(self, adict=None, **kwargs):
# TODO: May need more methods, and tests.
# TODO: document conflict resolution.
class IPersistentDataManager(Interface):
"""Provide services for managing persistent state.
# TODO: deprecate this for 3.6.
def register(object):
"""Register the given object for transaction control."""
# TODO: Should it be possible to join a committing transaction?
# I think some users want it.
raise ValueError("expected txn status %r or %r, but it's %r" % (
Status.ACTIVE, Status.DOOMED, self.status))
# be better to use interfaces. If this is a ZODB4-style
# resource manager, it needs to be adapted, too.
if myhasattr(resource, "prepare"):
resource = DataManagerAdapter(resource)
self._resources.append(resource)
# TODO: comment out this expensive assert later
# Use id() to guard against proxies.
assert id(obj) not in map(id, adapter.objects)
adapter.objects.append(obj)
# TODO deprecate subtransactions
self._subtransaction_savepoint = self.savepoint(optimistic=True)
return
# XXX should we take further actions here ?
self.log.error("Error in abort() on manager %s",
rm, exc_info=sys.exc_info())
self._after_commit = []
# TODO: do we need to make this warning stronger?
# to stop committing transactions at this point.
self.log.critical("A storage error occurred during the second "
"phase of the two-phase commit. Resources "
# TODO deprecate subtransactions.
if not self._subtransaction_savepoint:
raise interfaces.InvalidSavepointRollbackError
if self._subtransaction_savepoint.valid:
# TODO: We need a better name for the adapters.
class MultiObjectResourceAdapter(object):
"""Adapt the old-style register() call to the new-style join().
# TODO: deprecate for 3.6.
class DataManagerAdapter(object):
"""Adapt zodb 4-style data managers to zodb3 style
# TODO: I'm not sure why commit() doesn't do anything
def commit(self, transaction):
# We don't do anything here because ZODB4-style data managers
# TODO: This code is currently broken. import ZODB, ZODB.DB, ZODB.FileStorage, ZODB.POSException import persistent
# TODO: Should check that the results are consistent!
print "Total time:", t2 - t0
print "Server start time", t1 - t0
# TODO: Need to make sure eviction of non-current data
# and of version data are handled correctly.
def testSerialization(self):
# TODO: Temporarily disabled. I know it fails, and there's no point
# getting an endless number of reports about that.
def xxxcheckConcurrentUpdatesInVersions(self):
self._storage = storage1 = self.openClientStorage()
# TODO: Compare checkReconnectXXX() here to checkReconnection()
# further down. Is the code here hopelessly naive, or is
# checkReconnection() overwrought?
# TODO: with the current ZEO code, this occasionally fails.
# That's the point of this test. :-)
def NOcheckMultiStorageTransaction(self):
# TODO: This code is currently broken. import transaction import ZODB
# TODO: should put this in setUp.
self.storage = self.openClientStorage()
s = self.get_monitor_output()
self.storage.close()
# TODO: make timeout configurable?
attempt_timeout = 5
while not self.stopped:
success = self.try_connecting(attempt_timeout)
# TODO: should check deadline class ConnectWrapper:
# TODO: This is hardly "secure".
if name.startswith('_'):
return None
return hasattr(self.obj, name)
# TODO: If we are not in async mode, this will cause dead
# Connections to be leaked.
def set_async(self, map):
# TODO: avoid expensive getattr calls? Can't remember exactly what
# this comment was supposed to mean, but it has something to do
# with the way asyncore uses getattr and uses if sock:
def __nonzero__(self):
# TODO: is there a better way to do this?
if type(r) == types.ClassType and issubclass(r, Exception):
return r
# TODO: This used to say "ZSS", which is now implied in the logger name. # Can this be either set to str(os.getpid()) (if that makes sense) or removed? _label = "" # default label used for logging.
# XXX check that underlying storage supports blobs
key = (oid, id)
if key not in self.blob_transfer:
tempname = mktemp()
self.blob_transfer[key] = (tempname, tempfile) # XXX Force close and remove them when Storage closes
else:
tempname, tempfile = self.blob_transfer[key]
# TODO: Want to fetch object without marking it as accessed.
o = self.fc.access((oid, cur_tid))
assert o is not None
assert o.end_tid is None # i.e., o was current
# TODO: Since we asserted o is not None above, this block
# should be removed; waiting on time to prove it can't happen.
return
o.end_tid = tid
# TODO: If serial number matches transaction id, then there is
# no need to have all this extra infrastructure for handling
# serial numbers. The vote call can just return the tid.
# If there is a conflict error, we can't have a special method
# XXX need to check for POSIX-ness here
if blob_dir is not None:
self.fshelper = FilesystemHelper(blob_dir)
self.fshelper.create()
# TODO: maybe there's a better time to open the cache? Unclear.
self._cache.open()
self._rpc_mgr = self.ConnectionManagerClass(addr, self,
# TODO: Should we check the protocol version here?
self._conn_is_read_only = 0
stub = self.StorageServerStubClass(conn)
# TODO: report whether we get a read-only connection.
if self._connection is not None:
reconnect = 1
else:
# TODO: should batch these operations for efficiency; would need
# to acquire lock ...
for oid, tid, version in self._cache.contents():
server.verify(oid, version, tid)
# TODO: Is this method used?
return self._info['length']
def getName(self):
# TODO: Is it okay that read-only connections allow pack()?
# rf argument ignored; server will provide its own implementation
if t is None:
t = time.time()
# XXX will fail on Windows if file is open
os.rename(tempfilename, blob_filename)
return blob_filename
# TODO: this used to reinitialize zLOG. How do I achieve
# the same effect with Python's logging package?
# Should we restart as with SIGHUP?
log("received SIGUSR2, but it was not handled!", level=logging.WARNING)
# TODO: The forker interface isn't clearly defined. It's different on # different branches of ZEO. This will break someday. # something to do. class PackerTests(StorageTestBase):
# TODO: should do retries w/ exponential backoff.
cs = ClientStorage(addr, storage=storage, wait=0,
read_only=(not write))
else:
# TODO: how to check undo?
# TODO: Is _cache supposed to have a clear() method, or not?
# cn._cache.clear()
# The last undo set the value to 3 and pack should
# XXX need to update files to get newer testing package
class FakeModule:
def __init__(self, name, dict):
self.__dict__ = dict
# TODO: not really sure how to do a black box test of the cache.
# Should the full sweep and minimize calls always remove things?
def checkFullSweep(self):
# TODO: don't have an explicit test for incrgc, because the
# connection and database call it internally.
# Same for the get and invalidate methods.
# TODO: should test for bogus inputs to TimeStamp constructor
def checkTimeStamp(self):
# Alternate test suite
# TODO: Need to implement a real loadBefore for DemoStorage?
pass
def checkLoadBeforeVersion(self):
pass
# the next three pack tests depend on undo
# TODO: This test fails in ZODB 3.3a1. It's making some assumption(s)
# about pickles that aren't true. Hard to say when it stopped working,
# because this entire test suite hasn't been run for a long time, due to
# a mysterious "return None" at the start of the test_suite() function
# TODO: Try this test with logging enabled. If you see something
# like
#
# ZODB FS FS21 warn: FileStorageTests.fs truncated, possibly due to
# TODO: There are other edge cases to handle, including pack.
# TODO: Should revise this class to use FileStorageFormatter.
def __init__(self, path, dest=None):
self.file = open(path, "rb")
# TODO: Probably better to junk this and redefine _index as mapping
# oid to (offset, tid) pair, via a new memory-efficient BTree type.
self._oid2tid = oid2tid
# oid->tid map to transactionally add to _oid2tid.
# TODO: Not sure the following is always true:
# The previous record is not for this version, yet we
# have a backpointer to it. The current record must
# be an undo of an abort or commit, so the backpointer
# TODO: This seek shouldn't be necessary, but some other
# bit of code is messing with the file pointer.
assert self._tfile.tell() == here - base, (here, base,
self._tfile.tell())
# TODO: put exceptions in a separate module.
from ZODB.FileStorage.FileStorage import RedundantPackWarning
raise RedundantPackWarning(
"The database has already been packed to a later time"
# TODO: Should add sanity checking to pack.
self.gc.findReachable()
# TODO: rename from _tfile to something clearer.
self._tfile = open(self._name + ".pack", "w+b")
self._file.seek(0)
self._tfile.write(self._file.read(self._metadata_size))
# TODO: The filter argument to history() only appears to be
# supported by FileStorage. Perhaps it shouldn't be used.
L = self.history(oid, "", n, lambda d: not d["version"])
if not L:
# TODO: we should test what happens when cacheGC is called mid-transaction.
def cacheGC(self):
"""Reduce cache size to target size."""
self._cache.incrgc()
# TODO: There is a potential problem lurking for persistent
# classes. Suppose we have an invalidation of a persistent
# class and of an instance. If the instance is
# invalidated first and if the invalidation logic uses
# TODO: Deprecate, then remove, this.
if hasattr(obj, 'aq_base'):
self._cache[oid] = obj.aq_base
else:
# TODO: Why do we go to all the trouble of setting _db and
# other attributes on open and clearing them on close?
# A Connection is only ever associated with a single DB
# and Storage.
# XXX what to do about IBlobStorages?
tmpstore = TmpStore(self._version, self._normal_storage)
self._savepoint_storage = tmpstore
self._storage = self._savepoint_storage
assert isinstance(serial, str) # XXX in theory serials could be
# something else
targetpath = self._getBlobPath(oid)
# TODO: This packing algorithm is flawed. It ignores
# references from non-current records after the pack
# time.
# TODO: deal with ZEO disconnected errors?
def g(*args, **kwargs):
n = retries
# TODO: should this accept all the arguments one may pass to DB.open()?
def get_connection(database_name):
"""Return a Connection for the named database.
# TODO: Perhaps it would be better to break the reference
# cycles between `c` and `c._cache`, so that refcounting reclaims
# both right now. But if user code _does_ have a strong
# reference to `c` now, breaking the cycle would not reclaim `c`
# TODO: Figure out exactly which objects are involved in the
# cycle.
connection.__dict__.clear()
return
# TODO: should be done by connection
obj._p_oid = oid
obj._p_jar = self._conn
# When an object is created, it is put in the UPTODATE
# TODO, make connection _cache attr public
reader = ObjectReader(conn, conn._cache, self._factory)
return reader.load_persistent(oid, klass)
# TODO, make connection _cache attr public
reader = ObjectReader(conn, conn._cache, self._factory)
return reader.load_oid(oid)
# different branches of ZEO. This will break someday.
# something to do.
class PackerTests(StorageTestBase):
# XXX For now, don't evict pages if the new version of the object
# is big enough to require eviction.
node = self.cache.get(oid)
if node is None or node.kind is a1out:
# XXX Should an invalidation to a1out count?
if node.kind is a1out:
return
node.linkbefore(self.a1out)
# XXX multiply computed adjustments to p by walk_factor
self.walk_factor = 500
# statistics
# XXX hack, apparently we can't get rid of anything else
break
prev = need
# XXX need a better min than 1?
## print "adapt+", max(1, self.lruB_size // self.fifoB_size)
delta = max(1, self.lruB_size / max(1, self.fifoB_size))
self.p += delta * self.walk_factor
# XXX need a better min than 1?
## print "adapt-", max(1, self.fifoB_size // self.lruB_size)
delta = max(1, self.fifoB_size / max(1, self.lruB_size))
self.p -= delta * self.walk_factor
# XXX these files should be created in the same partition as
# the storage later puts them to avoid copying them ...
def __init__(self, name, mode, blob):
# XXX we need to ensure that the file is closed at object
# expiration or our blob's refcount won't be decremented.
# This probably needs some work; I don't know if the names
# 'BlobFile' or 'super' will be available at program exit, but
# XXX Log warning if storage is ClientStorage
SpecificationDecoratorBase.__init__(self, storage)
self.fshelper = FilesystemHelper(base_directory)
self.fshelper.create()
assert isinstance(serial, str) # XXX in theory serials could be
# something else
# the user may not have called "open" on the blob object,
# XXX if oid already in there, something is really hosed.
# The underlying storage should have complained anyway
self.dirty_oids.append((oid, serial))
finally:
# XXX we should be tolerant of "garbage" directories/files in
# the base_directory here.
base_dir = self.fshelper.base_dir
# XXX need a method to initialize the blob from the storage
# this means a) setting the _p_blob_data filename and b) putting
# the current data in that file
# XXX TODO: 'name' attribute of returned files is not mutable or
# settable via fdopen, so this file is slighly less functional than the
# one returned from 'open' by default. send a patch to Python...
# XXX TODO: *thorough* audit and documentation of the exact desired
# semantics of this code. Right now the behavior of existent
# destination symlinks is convenient, and quite possibly correct, but
# its security properties need to be explained.
# XXX TODO: optionally use os.open, os.read and O_DIRECT and
# use os.fstatvfs to determine chunk sizes and make
# *****sure**** copy is page-atomic; the following is good
# enough for 99.9% of everybody and won't take a week to audit
# XXX Some people claim to have found non-strings in sys.path (an empty
# list, in particular). Instead of tracking down the cause for their
# presence, they decided it was better to discard them unconditionally
# without further investigation. At some point, someone should track
# XXX: This won't raise a TypeError if it's called
# with a value when it shouldn't be.
fn = lambda name, value=None, m=method: m()
# XXX: sanity check to make sure we have a sane combination of keys.
maxOptLen = 0
for opt in optList:
# XXX - impose some sane minimum limit.
# Then if we don't have enough room for the option and the doc
# to share one line, they can take turns on alternating lines.
# XXX # This does not belong here # But where does it belong?
#XXX Maybe get rid of this, and rather use hasattr()s
if not isinstance(other, self.__class__):
return False
for attr in self.compareAttributes:
# XXX: This is a band-aid. I can't figure out where these
# (failure.stack is None) instances are coming from.
c['stack'] = [
[
# TODO: indentation for chained failures?
file.write(" (chained Failure)\n")
self.value.printTraceback(file, elideFrameworkCode, detail)
if detail == 'verbose':
# XXX Implement an atomic thingamajig for win32
import shutil
def symlink(value, filename):
newlinkname = filename+"."+unique()+'.newlink'
# XXX fix this to use python's builtin _winreg?
def getProgramsMenuPath():
"""Get the path to the Programs menu.
# TODO: this should be a little choosier about which path entry
# it selects first, and it should do all the .so checking and
# crud
potentialBasename = potentialTopLevel.basename()[:-len(ext)]
# XXX NOTE: This API isn't a very good idea on filepath, but it's even
# less meaningful here.
return self.parent().path
# TODO: # * send script # * replace method # * save readline history
# TODO: # Make wrap-mode a run-time option. # Explorer. # Code doesn't cleanly handle opening a second connection. Fix that.
# TODO: Factor Python console stuff back out to pywidgets.
class ConsoleOutput:
_willScroll = None
# TODO: Make this a singleton tag table.
for name, props in tagdefs.iteritems():
tag = self.buffer.create_tag(name)
# This can be done in the constructor in newer pygtk (post 1.99.14)
# XXX: It seems weird to have to do this thing with always applying
# a 'default' tag. Can't we change the fundamental look instead?
tags = ["default"]
if kind is not None:
# TODO: Componentize!
self.toplevel.output.append(str(e), "exception")
except (OverflowError, ValueError), e:
self.toplevel.output.append(str(e), "exception")
# TODO: Componentize better!
try:
return self.toplevel.getComponent(IManholeClient).do(text)
except OfflineError:
# TODO: make key bindings easier to customize.
stopSignal = False
# ASSUMPTION: Assume Meta == mod4
# XXX: You'd think we'd use style.bg instead of 'None'
# here, but that doesn't seem to match the color of
# the backdrop.
self.insert(style.font, style.fg[gtk.STATE_NORMAL],
# XXX: We should include a bg[NORMAL] line here, but doing so
# messes things up as it doesn't seem to set the color of the
# backdrop.
}
# TODO: # gzigzag-style navigation class SillyModule:
# XXX: For some reason, the 'canvas' and 'parent' properties
# of CanvasItems aren't accessible thorugh pygnome.
Explorer.canvas = self
# TODO:
# Collapse me
# Movable/resizeable me
# Destroy me
# XXX: make these guys collapsable
for g, name in self.groupLabels:
table = gtk.Table(1, 2)
self.container.add(table)
# XXX: Do I need to destroy previously attached children?
for name, value in propValues:
self.fill_property(name, value)
# XXX: How to indicate detail level of members?
table = self.subtable[group]
if not attributes:
# XXX: Do I need to destroy previously attached children?
row = 1 # 0 is title
# XXX: allocation PyCObject is apparently unusable!
# (w, h) = allocation.width, allocation.height
w, h = (float(w)/_PIXELS_PER_UNIT, float(h)/_PIXELS_PER_UNIT)
# XXX: include partial module name in class?
self.frame.set_label("%s (%s)" % (self.identifier,
class_identifier))
# XXX
pass
a = "%s=%s" % (name, default)
elif signature.is_varlist(arg):
# XXX: add elements group
class SequenceAttributeWidget(AttributeWidget):
def getTextForLabel(self):
if self.explorer.len:
txt = "list of length %d" % (self.explorer.len,)
else:
# XXX: add items group
class MappingAttributeWidget(AttributeWidget):
def getTextForLabel(self):
# TODO -- refactor this, Reality.author.Author, and the manhole shell
#to use common functionality (perhaps a twisted.python.code module?)
fn = '$telnet$'
result = None
# TODO: client support for Deferred.
if isinstance(val, Deferred):
self.lastDeferred += 1
self.console([('result', "Waiting for Deferred #%s...\n" % self.lastDeferred)])
# TODO -- refactor this and twisted.reality.author.Author to
# use common functionality (perhaps the 'code' module?)
dict = self.__dict__.copy()
ns = dict['namespace'].copy()
# XXX: This potentially returns something with
# 'identifier' set to a different value.
return self.data[oid]
else:
# XXX: set the .elements member of all my remoteCaches
return self.get_elements()
# XXX: set the .keys member of all my remoteCaches
return self.get_keys()
def view_get_item(self, perspective, key):
# TODO: Make screening of private attributes configurable.
if i[0] == '_':
continue
mIdentifier = string.join([identifier, i], ".")
# TODO:
#
# * an exclude mechanism for the watcher's browser, to avoid
# sending back large and uninteresting data structures.
# XXX: This probably prevents these objects from ever having a
# zero refcount. Leak, Leak!
## self.watchUninstallers[object] = uninstallers
# XXX: this conditional probably isn't effective.
if oldMethod is not self:
# avoid triggering __setattr__
self.instance.__dict__[methodIdentifier] = (
# XXX: This probably doesn't work if multiple monkies are hanging
# on a method and they're not removed in order.
if self.oldMethod[1] is None:
delattr(self.instance, self.oldMethod[0])
# XXX: Hey, waitasec, did someone just hang a new method on me?
# Do I need to put a monkey on it?
self._watchEmitChanged()
# XXX support multi-line headers
try:
name, value = line.split(":", 1)
except ValueError:
# XXX we don't do multicast yet
host = destVia.received or destVia.host
port = destVia.rport or destVia.port or self.PORT
destAddr = URL(host=host, port=port)
# XXX we don't do multicast yet
host = destVia.received or destVia.host
port = destVia.rport or destVia.port or self.PORT
# XXX note this check breaks if we have multiple external IPs
# yay for suck protocols
log.msg("Dropping incorrectly addressed message")
return
# XXX
# ACKs are a client's way of indicating they got the last message
# Responding to them is not a good idea.
# However, we should keep track of terminal messages and re-transmit
# XXX Check expires on appropriate URL, and pass it to registry
# instead of having registry hardcode it.
if contact is not None:
name, contactURL, params = parseAddress(contact, host=host, port=port)
# XXX return error message, and alter tests to deal with
# this, currently tests assume no message sent on failure
def unregister(self, message, toURL, contact):
# TODO: Investigate whether we should be using os.times()[-1] instead of # time.time. time.time, it has been pointed out, can go backwards. Is # the same true of os.times? from time import time
# TODO: Check to see if consumer supports writeSeq.
self.consumer.write(''.join(self._buffer))
self._buffer[:] = []
else:
# TODO: You can see here the potential for high and low
# watermarks, where bufferSize would be the high mark when we
# ask the upstream producer to pause, and we wouldn't have
# it resume again until it hit the low mark. Or if producer
RESTART_MARKER_REPLY: '110 MARK yyyy-mmmm', # TODO: this must be fixed
SERVICE_READY_IN_N_MINUTES: '120 service ready in %s minutes',
DATA_CNX_ALREADY_OPEN_START_XFR: '125 Data connection already open, starting transfer',
FILE_STATUS_OK_OPEN_DATA_CNX: '150 File status okay; about to open data connection.',
# TODO: LEFT OFF HERE!
d.addErrback(debugDeferred, 'timeoutFactory firing errback')
d.errback(defer.TimeoutError())
# XXX It burnsss
# LineReceiver doesn't let you resumeProducing inside
# lineReceived atm
from twisted.internet import reactor
# TODO: add max auth try before timeout from ip...
def ftp_PASS(self, password):
"""
# XXX: why is this check different to ftp_RETR/ftp_STOR?
if self.dtpInstance is None or not self.dtpInstance.isConnected:
return defer.fail(BadCmdSequenceError('must send PORT or PASV before RETR'))
# XXX Maybe this globbing is incomplete, but who cares.
# Stupid people probably.
if segments and (
'*' in segments[-1] or '?' in segments[-1] or
# XXX Eh, what to fail with here?
return defer.fail(FileNotFoundError(path))
def accessGranted(result):
# XXX For now, just disable the timeout. Later we'll want to
# leave it active and have the DTP connection reset it
# periodically.
self.setTimeout(None)
# XXX For now, just disable the timeout. Later we'll want to
# leave it active and have the DTP connection reset it
# periodically.
self.setTimeout(None)
# XXX: d.addErrback(_unwrapFirstError), but add a test.
for cmd in cmds:
self.queueCommand(cmd)
if self.transport.disconnecting: # XXX: argh stupid hack borrowed right from LineReceiver
return # dataReceived won't be called again, so who cares about consistent state
if next:
state = next
# XXX this takes command subclasses and not command objects on purpose.
# There's really no reason to have all this back-and-forth between
# command objects and the protocol, and the extra object being created
# (the Command instance) is pointless. Command is kind of like
# XXX before we get back to user code we are going to start TLS...
def actuallystart(response):
proto._startTLS(self.certificate, self.authorities)
return response
# XXX this may be a slight oversimplification, but I believe that if
# there are pending SSL errors, they _are_ the reason that the
# connection was lost. a totally correct implementation of this would
# set up a simple state machine to track whether any bytes were
# XXX test chgrp/own
def testList(self):
lsRes = self._getCmdResult('ls').split('\n')
# XXX test lls in a way that doesn't depend on local semantics
def testHelp(self):
helpRes = self._getCmdResult('?')
# XXX test setAttrs
# Ok, how about this for a start? It caught a bug :) -- spiv.
d = self.client.openFile("testfile1", filetransfer.FXF_READ |
filetransfer.FXF_WRITE, {})
# XXX not until version 4/5
# self.failUnlessEqual(filetransfer.FILEXFER_TYPE_DIRECTORY&attrs['type'],
# filetransfer.FILEXFER_TYPE_DIRECTORY)
# XXX this can be optimized for times w/o progress bar
return self._cbGetMultipleNext(None, files, local)
def _cbGetMultipleNext(self, res, files, local):
# TODO: we should also look at the value they send to us and reject
# insecure values of f (if g==2 and f has a single '1' bit while the
# rest are '0's, then they must have used a small y also).
# or do as openssh does and scan f for a single '1' bit instead
minimum = long(math.floor(math.log(self.p) / math.log(2)) + 1)
assert(y >= minimum) # TODO: test_conch just hangs if this is hit
# the chance of it being hit are really really low
f = pow(self.g, y, self.p)
# XXX should this move to dataReceived to put client in charge?
if dataLength > channel.localWindowLeft or \
dataLength > channel.localMaxPacket: # more data than we want
log.callWithLogger(channel, lambda s=self,c=channel:
#XXX not implemented
continue
try:
f(data)
# XXX - need to support scroll regions and scroll history
class TerminalBuffer(protocol.Protocol):
implements(insults.ITerminalTransport)
# XXX TODO - These attributes are really part of the
# ITerminalTransport interface, I think.
_KEY_NAMES = ('UP_ARROW', 'DOWN_ARROW', 'RIGHT_ARROW', 'LEFT_ARROW',
'HOME', 'INSERT', 'DELETE', 'END', 'PGUP', 'PGDN', 'NUMPAD_MIDDLE',
# XXX Support ANSI-Compatible private modes
self.write('\x1b[%sh' % (';'.join(map(str, modes)),))
def setPrivateModes(self, modes):
# XXX Support ANSI-Compatible private modes
self.write('\x1b[%sl' % (';'.join(map(str, modes)),))
def resetPrivateModes(self, modes):
# XXX Rewrite these as dict lookups
if which == G0:
which = '('
elif which == G1:
# XXX - Handle '?' to introduce ANSI-Compatible private modes.
try:
modes = map(int, buf.split(';'))
except ValueError:
# XXX - Handle '?' to introduce ANSI-Compatible private modes.
try:
modes = map(int, buf.split(';'))
except ValueError:
# XXX TODO - Handle shift+tab
raise YieldFocus()
def focusReceived(self):
# XXX /Lame/
for y, line in enumerate(self._buf.lines[self._yOffset:self._yOffset + height]):
terminal.cursorPosition(0, y)
n = 0
if alwaysDisplay: # XXX what should happen here?
print message
def verifyHostKey(self, pubKey, fingerprint):
# XXX reenable this when i can fix it for cygwin
#elif filestats[-3:] != stats[-3:]:
# log.msg("socket doesn't have same create times")
else:
# XXX TODO: This should be enabled to parse linemode subnegotiation.
getattr(self, 'linemode_' + self.linemodeSubcommands[linemodeSubcommand])(bytes[1:])
def linemode_SLC(self, bytes):
# XXX TODO
# chainedProtocol is supposed to be an ITerminalTransport,
# maybe. That means perhaps its terminalProtocol attribute is
# an ITerminalProtocol, it could be. So calling terminalSize
# XXX Note: I would prefer to default to starting in insert
# mode, however this does not seem to actually work! I do not
# know why. This is probably of interest to implementors
# subclassing RecvLine.
# XXX XXX Note: But the unit tests all expect the initial mode
# to be insert right now. Fuck, there needs to be a way to
# query the current mode or something.
# self.setTypeoverMode()
# XXX - Clear the previous input line, redraw it at the new
# cursor position
self.terminal.eraseDisplay()
self.terminal.cursorHome()
# XXX should this use method dispatch?
requestType = requestType.replace('-','_')
f = getattr(self, "global_%s" % requestType, None)
if not f:
# XXX Ick, where is my "hasFired()" interface?
if hasattr(obj, "result"):
self.write(repr(obj))
elif id(obj) in self._pendingDeferreds:
assert components.implements(proto, ip.IIPProtocol) # XXX: fix me
base.BasePort.__init__(self, reactor)
self.interface = interface
self.protocol = proto
# TODO: # * more comprehensive testing of error conditions, e.g. deleting files that # don't exist. # * test more complex paths: such as subdirectories, /foo/../bar/.
# XXX: Import this to make sure the adapter registration has happened. from twisted.vfs.adapters import stream
# XXX - this should probably go in a helper somewhere
def _attrify(self, node):
meta = node.getMetadata()
permissions = meta.get('permissions', None)
# XXX: stubbed out to always succeed.
return defer.succeed(None)
def openForReading(self, segs):
# XXX: this method is way too ugly
dirname, basename = segs[:-1], segs[-1]
node = self.filesystem.fetch(
self._makePath(dirname)).createFile(basename)
# XXX Once we change readChunk/writeChunk we'll have to wrap
# child in something that implements those.
pathSegments = self.filesystem.splitPath(filename)
# XXX: setMetadata isn't yet part of the IFileSystemNode interface
# (but it should be). So we catch AttributeError, and translate it
# to NotImplementedError because it's slightly nicer for clients.
node.setMetadata(attrs)
# XXX: setMetadata isn't yet part of the IFileSystemNode interface
# (but it should be). So we catch AttributeError, and translate it
# to NotImplementedError because it's slightly nicer for clients.
self.original.setMetadata(attrs)
# XXX - this may be broken
log.msg('avatar %s logging out (%i)' % (self.username, len(self.listeners)))
# XXX spiv 2005-12-15
# assumes newParent is also an OSDirectory. Probably should politely
# decline (rather than break with an undefined error) if it's not.
newPath = os.path.join(newParent.realPath, pathutils.basename(newName))
# XXX: There should be a setMetadata, probably taking a map of the same form
# returned by getMetadata (although obviously keys like 'nlink' aren't
# settable. Something like:
# def setMetadata(metadata):
# XXX Sigh. irc.parsemsg() is not as correct as one might hope.
self.assertEquals(response[3][2], ['useruser', '#somechannel', 'This is a test topic.'])
self.assertEquals(response[4][1], '333')
self.assertEquals(response[4][2], ['useruser', '#somechannel', 'some_fellow', '77777777'])
# XXX: Revist this, given new grammar
# xp = XPathQuery("/foo/bar[2]")
# self.assertEquals(xp.matches(self.e), 1)
# self.assertEquals(xp.queryForNodes(self.e), [self.bar1])
# TODO: move registration into an Initializer?
def registerAccount(self, username = None, password = None):
if username:
# TODO: raise exception instead?
return StanzaError(None)
exception = StanzaError(condition, type, text, textLang, appCondition)
# TODO - add error checking
raise
cnonce = self._gen_nonce()
qop = 'auth'
a1 = "%s:%s:%s" % (H("%s:%s:%s" % (username, realm, password)),
nonce,
cnonce)
# XXX: Add a default callback which updates
# factory.contacts.version and the relevant phone
# number
id, d = self._createIDMapping()
# XXX: A lot of the state currently kept in # instances of SwitchboardClient is likely to # be moved into a factory at some stage in the # future
# XXX If unicode is given, these limits are not quite correct
prefixLength = len(channel) + len(user) + 10
namesLength = 512 - prefixLength
# XXX: prefixedMethodNames gets methods from my *class*,
# but it's entirely possible that this *instance* has more
# methods.
names = reflect.prefixedMethodNames(self.__class__,
# XXX Should we bother passing this data?
self.dccDoSend(user, address, port, filename, size, data)
def dcc_ACCEPT(self, user, channel, data):
# XXX: Do we need to check to see if len(data) != fmtsize?
bytesShesGot = struct.unpack("!I", data)
if bytesShesGot < self.bytesSent:
# XXX? Add some checks to see if we've stalled out?
return
elif bytesShesGot > self.bytesSent:
# self.transport.log("DCC SEND %s: She says she has %d bytes "
# XXX: update a progress indicator here?
def connectionLost(self, reason):
"""When the connection is lost, I close the file.
#if self._buf[:3]=="GET": self.modeWeb() # TODO: get this working
if len(self._buf)<10: return "Flapon" # not enough bytes
flapon,self._buf=self._buf[:10],self._buf[10:]
if flapon!="FLAPON\r\n\r\n":
# XXX may not work
nick=unquote(data)
if normalize(nick)==self.username:
self.saved.nick=nick
# XXX add this back in
#reactor.clientTCP(pip,port,GetFileTransfer(self,cookie,os.path.expanduser("~")))
#self.rvous_accept(user,cookie,GET_FILE_UID)
self.hdr[19]=DUMMY_CHECKSUM # XXX really calculate this
self.hdr[18]=self.hdr[18]+1
self.hdr[21]="\000"
self.transport.write(apply(struct.pack,[self.header_fmt]+self.hdr))
text=string.replace(text,"
","\n") # XXX make this a regexp text=string.replace(text,"
","\n") text=re.sub('<.*?>','',text) text=string.replace(text,'>','>')
#d.addErrback(self._ebDeferredError,fam,sub,data) # XXX for testing
self.requestCallbacks[reqid] = d
self.sendFLAP(SNAC(fam,sub,reqid,data))
# XXX what is this?
self.receiveMessage(user, multiparts, flags)
elif channel == 2: # rondevouz
status = struct.unpack('!H',tlvs[5][:2])[0]
self.chatui._accountmanager = self # TODO: clean this up... it's used in gtkchat
print self.xml._o
autoConnectMethods(self, self.chatui.theContactsList)
self.widget = self.xml.get_widget("AccountManWidget")
# XXX: Why do I duplicate code in IRCClient.register?
try:
print 'connection made on irc service!?', self
if self.account.password:
text=string.replace(text,"
","\n") # XXX make this a regexp text=string.replace(text,"
","\n") text=re.sub('<.*?>','',text) text=string.replace(text,'>','>')
# TODO: we shouldn't be getting group conversations randomly without
# names, but irc autojoin appears broken.
self.xml.get_widget("NickLabel").set_text(
getattr(self.group.account.client,"name","(no name)"))
user, err = result.value # XXX
self.remove(user, err.getErrorMessage())
# XXX Deferred?
return iter(self.users.values())
# XXX Send an error response here
self.transport.loseConnection()
elif target.lower() != "nickserv":
self.privmsg(
"creationDate": ctime(), # XXX
}
for code, text in self._welcomeMessages:
self.sendMessage(code, text % info)
# XXX Deferred support here
getattr(facet, 'logout', lambda: None)()
avatar.realm = avatar.mind = None
return logout
# XXX Attribute lookup on config is kind of bad - hrm.
for plgName in config.interfacePlugins:
port = config.get(plgName + '-port')
if port is not None:
# TODO: it should be possible to embed yappsrt into the generated # grammar to make a standalone module. import sys, re
# TODO: make this work at any token/char position
return self.first_line_number + self.get_input_scanned().count('\n')
def get_column_number(self):
# TODO: separate out the logic for determining the line/character
# location from the logic for determining how to display an
# 80-column line to stderr.
# TODO: add line number
print >>sys.stderr, 'while parsing %s%s:' % (context.rule, tuple(context.args))
print_line_with_pointer(input, context.scanner.get_prev_char_pos(context.tokenpos))
context = context.parent
# TODO: it should be possible to embed yappsrt into the generated # grammar to make a standalone module. import sys, re
# TODO: make this work at any token/char position
return self.first_line_number + self.get_input_scanned().count('\n')
def get_column_number(self):
# TODO: separate out the logic for determining the line/character
# location from the logic for determining how to display an
# 80-column line to stderr.
# TODO: add line number
print >>sys.stderr, 'while parsing %s%s:' % (context.rule, tuple(context.args))
print_line_with_pointer(input, context.scanner.get_prev_char_pos(context.tokenpos))
context = context.parent
# XXX: Write more legible explanation
raise ParserError, "Element closed after end of document."
# Fix up name
# XXX: Write more legible explanation
raise ParserError, "Malformed element close"
# Pop prefix and default NS stack
# TODO: # CompoundStreamTest # more tests for ProducerStreamTest # StreamProducerTest
# TODO: ungzip (can any browsers actually generate gzipped # upload data?) But it's necessary for client anyways. def gzipStream(input, compressLevel=6):
# TODO: support Trailers (maybe! but maybe not!)
# After getting the final "0" chunk we're here, and we *EAT MERCILESSLY*
# any trailer headers sent, and wait for the blank line to terminate the
# TODO: support gzip/etc encodings.
# FOR NOW: report an error if the client uses any encodings.
# They shouldn't, because we didn't send a TE: header saying it's okay.
if transferEncoding:
# XXX: Yay using non-existent sendfile support!
# FIXME: if we return a SendfileBuffer, and then sendfile
# fails, then what? Or, what if file is too short?
readSize = min(length, SENDFILE_LIMIT)
# XXX deal better with addresses
p = factory.buildProtocol(None)
out = ProducerStream()
p.makeConnection(out)
readStream(inputStream, lambda _: p.dataReceived(_)).addCallbacks(
lambda _: p.connectionLost(ti_error.ConnectionDone()), lambda _: p.connectionLost(_))
# XXX: sucks that we have to do this. make transport.write(buffer) work!
data = str(buffer(data))
self.consumer.write(data)
# XXX what happens if spawn fails?
reactor.spawnProcess(self._protocol, self._program, self._args, env=self._env)
del self._env
return self._protocol.resultDeferred.addErrback(lambda _: _.trap(ti_error.ProcessDone))
# TODO: # * Handle scgi server death, half way through a resonse.
'Referer':(last,str), # TODO: URI object?
'TE':(tokenize, listParser(parseAcceptQvalue), dict),
'User-Agent':(last,str),
}
'Location':(last,), # TODO: URI object?
# 'Proxy-Authenticate'
'Retry-After':(last, parseRetryAfter),
'Server':(last,),
'Content-Location':(last,), # TODO: URI object?
'Content-MD5':(last, parseContentMD5),
'Content-Range':(last, parseContentRange),
'Content-Type':(lambda str:tokenize(str, foldCase=False), parseContentType),
'Destination' : (last,), # TODO: URI object?
#'If' : (),
#'Lock-Token' : (),
'Overwrite' : (last, parseOverWrite),
# XXX: Where to get user from?
"-",
self.logDateString(
response.headers.getHeader('date', 0)),
#XXX make state be foo=bar instead of a dict.
if self.stateIsDict:
stateDict = self.state
elif isinstance(self.state, Ref) and isinstance(self.state.obj, types.DictType):
#XXX this is unused????
"""Utility method for unjellying into instances of attributes.
Use this rather than unjellyAO unless you like surprising bugs!
# TODO: make methods 'prefer' not to jelly the object internally,
# so that the object will show up where it's referenced first NOT
# by a method.
retval = InstanceMethod(obj.im_func.__name__, reflect.qual(obj.im_class),
# XXX: DOMJellyable.unjellyNode does not exist
state = self.unjellyNode(getValueElement(node))
if hasattr(self.__class__, "__setstate__"):
self.__setstate__(state)
# XXX FIXME this is obviously insecure
# if you doubt:
# >>> unjellyFromXML(''' ''')
# "hi"
# TODO: make methods 'prefer' not to jelly the object internally,
# so that the object will show up where it's referenced first NOT
# by a method.
node.appendChild(self.jellyToNode(obj.im_self))
# TODO: beat pickle at its own game, and do BuiltinFunctionType
# separately, looking for __self__ attribute and unpickling methods
# of C objects when possible.
node = self.document.createElement("function")
# XXX - respect timeout
return self.lookupAllRecords(name, timeout
).addCallback(self._cbRecords, name, effort
)
self.origin = os.path.basename(filename) + '.' # XXX - this might suck
lines = open(filename).readlines()
lines = self.stripComments(lines)
lines = self.collapseContinuations(lines)
elif line[0] == '$INCLUDE': # XXX - oh, fuck me
raise NotImplementedError('$INCLUDE directive not implemented')
elif line[0] == '$GENERATE':
raise NotImplementedError('$GENERATE directive not implemented')
# XXX: timeout here is not a list of ints, it is a single int.
return getResolver().lookupZone(name, timeout)
def lookupAllRecords(name, timeout=None):
#TODO: IPv6 support
persistenceVersion = 1
# XXX we shouldn't need this hack of catching exceptioon on callback()
try:
d.callback(m)
except:
# XXX transport might not get created automatically, use callLater?
self.startListening()
if id is None:
# XXX Add errbacks, respect proper timeouts
reactor.callLater(i, c.start, 60 * 60)
i += 1
# TODO: if startup failed, should shutdown skip stopListening?
# _port won't exist
if self._port is not None:
d = self._port.stopListening()
# XXX - omfg python sucks
tmp, sys.stdout = sys.stdout, open(config['profile'], 'a')
p.print_stats()
sys.stdout, tmp = tmp, sys.stdout
# TODO - A method which provides a best-guess as to whether this reactor
# can actually be used in the execution environment.
# TODO: support more than one callback via Concurrent
def _execute(self, dummy = None):
cmd = self._controller
self.write = self.transport.write
# TODO: Potentially rename this 'Consumer' and make it
# comply with protocols.IConsumer
class Instruction(CallLater):
def __init__(self):
self.flow = lambda: True
# XXX This test is woefully incomplete. It tests the single
# most common code path and nothing else. Expand it and the
# test fairy will leave you a surprise.
# XXX - Maybe this inheritence doesn't make so much sense?
class UsenetServerFactory(NNTPFactory):
"""A factory for NNTP Usenet server protocols."""
# XXX - this could use a real implementation, eh?
self.sendLine('215 Descriptions in form "group description"')
self.sendLine('.')
elif subcmd == 'overview.fmt':
ERR_NOGROUP, ERR_NOARTICLE = range(2, 4) # XXX - put NNTP values here (I guess?)
OVERVIEW_FMT = [
'Subject', 'From', 'Date', 'Message-ID', 'References',
def hexdigest(md5): #XXX: argh. 1.5.2 doesn't have this.
return ''.join(map(lambda x: hex(ord(x))[2:], md5.digest()))
class Article:
#### XXX - make these static methods some day
####
def makeGroupSQL(groups):
res = ''
# XXX - Hrm.
["groups", "g", "groups.list", "File containing group list"],
["servers", "s", "servers.list", "File containing server list"]
]
# XXX - Hmmm.
self['groups'] = [g.strip() for g in open(self['groups']).readlines() if not g.startswith('#')]
self['servers'] = [s.strip() for s in open(self['servers']).readlines() if not s.startswith('#')]
# XXX - Hrm.
["groups", "g", "groups.list", "File containing group list"],
["servers", "s", "servers.list", "File containing server list"],
["moderators", "m", "moderators.list",
# XXX - Hmmm.
filename = self['file']
self['groups'] = [g.strip() for g in open(self['groups']).readlines()
if not g.startswith('#')]
# XXX: test web.distrib.
def setUp(self):
self.resrc = SimpleResource()
# xxx sanity check for now; just make sure it doesn't raise anything
class ModelPathTest(WovenTC):
modelFactory = lambda self: ['hello', ['hi', 'there'],
# TODO: this should be an interface in twisted.protocols.http... lots of
# things want to fake out HTTP
def __init__(self):
self.transport = self
## XXX THIS TEST IS TURNED OFF UNTIL SOMEONE WHO CARES ABOUT FIXING IT DOES
#self.failIf(d3.isEqualToDocument(d2), "%r == %r" % (d3.toxml(), d2.toxml()))
self.assert_(d3.isEqualToDocument(d4), "%r != %r" % (d3.toxml(), d4.toxml()))
self.assert_(d4.isEqualToDocument(d5), "%r != %r" % (d4.toxml(), d5.toxml()))
# TODO:
# * some arg types should only have a single node (text, string, etc)
# * some should have multiple nodes (choice, checkgroup)
# * some have a bunch of ancillary nodes that are possible values (menu, radiogroup)
# TODO: we ought to support Deferreds here for both text and href!
if isinstance(data, StringType):
node.tagName = self.tagName
node.attributes["href"] = data
# TODO: Need to handle deferreds here?
pass
def sendPage(self, request):
#XXX: Argh. FIXME.
failure = str(failure)
self.request.write(
error.ErrorPage(http.INTERNAL_SERVER_ERROR,
# TODO: fix this...
resKey = string.join(['AUTH',self.reqauth.service.serviceName], '_')
sess = request.getSession()
setattr(sess, resKey, perspective)
# TODO hiding forms behind a ResourceGuard sucks, because if # ResourceGuard needs to authenticate the user, it will 1) complain # about the form submitted, 2) throw the data away. This happens if # you use "foo?a=b" -style URLs and the user hasn't authenticated yet,
## XXX: How would a deferred go about producing the result in multiple
## stages?? --glyph
if result[0] is NOT_DONE_YET:
done = 0
## XXX: is this needed?
class WidgetResource(resource.Resource):
def __init__(self, widget):
self.widget = widget
# TODO who says it's not https?
request.setHeader("location","http%s://%s%s/" % (
request.isSecure() and 's' or '',
request.getHeader("host"),
#XXX: delete this after a while.
if hasattr(self, "page"):
log.msg("Gadget.page is deprecated, use Gadget.pageFactory instead")
return apply(self.page, args, kwargs)
# XXX refactor this attribute out; it's from protocol
# del x['server']
del x['channel']
del x['content']
"SCRIPT_NAME": script_name, # XXX
"SCRIPT_FILENAME": self.filename,
"REQUEST_URI": request.uri,
}
# XXX is this enough?
if ((url.find('://') == -1)
and (not url.startswith('..'))
and (not url.startswith('/'))):
# XXX FIXME really handle !DOCTYPE at some point
if self.tagName == '!DOCTYPE':
return 'doctype'
if self.tagName[0] in '!?':
self._parseError("Mal-formed")#XXX When does this happen??
if byte != '>':
if self.beExtremelyLenient:
return
# xxx use a file-extension-to-save-function dictionary instead
if type(child) == type(""):
fl = open(os.path.join(self.path, name), 'wb')
fl.write(child)
# XXX Sigh - race condition: start should return a Deferred
# which fires when all the workers it started have fully
# started up.
time.sleep(0.1)
# XXX As above
time.sleep(0.1)
self.assertEquals(len(tp2.threads), 7)
(out, err, sig) = err.value # XXX Sigh wtf
self.assertEquals(out, "stdout bytes" + os.linesep)
self.assertEquals(err, "stderr bytes" + os.linesep)
self.assertEquals(sig, signal.SIGKILL)
# TODO: Test the Transport stuff? from test_pcp import DummyConsumer
# TODO: currently the C implementation's a bit buggy...
sys.maxint * 3l, sys.maxint * 2l, sys.maxint * -2l]
self.enc.sendEncoded(foo)
for byte in self.io.getvalue():
# XXX - use private _flushErrors so we don't also catch
# the deprecation warnings
excs = [f.type for f in log._flushErrors(ZeroDivisionError)]
self.assertEquals([ZeroDivisionError], excs)
# XXX Test that reactor.stop() invokes shutdown triggers
# XXX - assume no one listening on port 80 UDP
client = Client()
clientStarted = client.startedDeferred = Deferred()
server = Server()
# XXX: According to ITransport, this should return an IAddress!
return 'file', 'file'
def getHost(self):
return 'file'
def resumeProducing(self):
# XXX slightly buggy in the face of incremental output
if cData:
print 'C: '+repr(cData)
if sData:
# XXX: Trial now does this (see
# twisted.trial.runner.MethodInfoBase._setUpSigchldHandler)... perhaps
# this class should be removed? Or trial shouldn't bother, and this
# class used where it matters?
testEcho.timeout = 60 # XXX This should not be. There is already a
# global timeout value. Why do you think this
# test can complete more quickly?
# XXX TODO: actually make this refuse to send over an insecure connection
response = [('pinged', amp.Boolean())]
class TestSwitchProto(amp.ProtocolSwitchCommand):
# XXX - should optional arguments just not be passed?
# passing None seems a little odd, looking at the way it
# turns out here... -glyph
From=('file', 'file'),
# XXX Twisted doesn't report SSL errors as SSL errors, but in the
# future it will.
# cResult.trap(SSL.Error)
# XXX Uh, how about some asserts?
reactor.suggestThreadPoolSize(34)
reactor.suggestThreadPoolSize(4)
# XXX This is a trial hack. We need to make sure the reactor
# actually *starts* for isInIOThread() to have a meaningful result.
# Returning a Deferred here should force that to happen, if it has
# not happened already. In the future, this should not be
# TODO: # test that web request finishing bug (when we weren't proxying # unregisterProducer but were proxying finish, web file transfers # would hang on the last block.)
## # TODO ensure the listen port is closed ## listen = self.sock.driver_listen ## if listen is not None: ## self.assert_(incoming.transport.stringTCPTransport_closing,
# XXX we assume no one is listening on TCP port 69
reactor.connectTCP("127.0.0.1", 69, clientF, timeout=5)
def check(ignored):
clientF.reason.trap(error.ConnectionRefusedError)
# XXX we don't test server side yet since we don't do it yet
d = protocol.ClientCreator(reactor, MyProtocol).connectTCP(
p.getHost().host, p.getHost().port)
d.addCallback(self._gotClient)
# XXX client won't be closed?! why isn't server sending RST?
# or maybe it is and we have a bug here.
self.client.transport.loseConnection()
log.flushErrors(RuntimeError)
("Uh.", #TODO nice docstring, you've got there.
error.ConnectionFdescWentAway),
("Tried to cancel an already-called event.",
# XXX - Piece of *crap* 2.1
self.assertEquals(input, imap4.decoder(output)[0])
def testPrintableSingletons(self):
# XXX This code used to work, but changes occurred within the
# imap4.py module which made it no longer necessary for *all* of it
# to work. In particular, only the part that makes
# 'BODY.PEEK[HEADER.FIELDS.NOT (Subject Bcc Cc)]' come out correctly
# XXX Should really use list of search terms and parse into
# a proper tree
return (query, '')
# XXX - This must search headers too
body = query.pop(0).lower()
return text.strFile(body, msg.getBodyFile(), False)
# XXX - This should handle failures with a rollback or something
addedDeferreds = []
addedIDs = []
failures = []
# XXX - The following should be an implementation of IMessageCopier.copy
# on an IMailbox->IMessageCopier adapter.
flags = msg.getFlags()
# XXX - This is rude.
self.transport.loseConnection()
raise IllegalServerResponse(tag + ' ' + rest)
# XXX - This is rude.
self.transport.loseConnection()
raise IllegalServerResponse(tag + ' ' + rest)
else:
# XXX - This is terrible.
# invocations of IMailboxListener methods, where possible.
flags = {}
recent = exists = None
# XXX UGGG parsing hack :(
r = parseNestedParens('(' + parts[1] + ')')[0]
return [e or [] for e in r]
log.err("No NAMESPACE response to NAMESPACE command")
# XXX - Super expensive, CACHE THIS VALUE FOR LATER RE-USE
lines = 0
for _ in msg.getBodyFile():
lines += 1
# XXX - This does not properly handle multipart messages
# BODYSTRUCTURE is obscenely complex and criminally under-documented.
attrs = {}
# XXX - I dunno if this is really right
if disp:
disp = disp.split('; ')
if len(disp) == 1:
# XXX - I dunno if this is really right
headers = msg.getHeaders(False, 'content-disposition', 'content-language')
disp = headers.get('content-disposition')
if disp:
# XXX - This may require localization :(
months = [
'jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct',
'nov', 'dec', 'january', 'february', 'march', 'april', 'may', 'june',
# XXX - This is -totally- bogus
# It opens about a -hundred- -billion- files
# and -leaves- them open!
# XXX - We need to be smarter about this
if self._waiting is not None:
waiting, self._waiting = self._waiting, None
waiting.errback(LineTooLong())
# XXX TODO See above comment regarding IndexError.
warnings.warn(
"twisted.mail.pop3.IMailbox.getUidl may not "
"raise IndexError for out-of-bounds message numbers: "
# XXX TODO See above comment regarding IndexError.
warnings.warn(
"twisted.mail.pop3.IMailbox.listMessages may not "
"raise IndexError for out-of-bounds message numbers: "
#XXX: May want to come up with a more efficient way to do this
for s in schemes:
tmpSchemes[s.upper()] = 1
# XXX - Yick. This needs cleaning up.
if self.user and self.service.queue:
d = self.service.domains.get(user.dest.domain, None)
if d is None:
# TODO: use Failures or something
if ret == ERROR_PORT_UNREACHABLE:
self.protocol.connectionRefused()
if self.reading:
self.write_op = WriteFileOp(self) # XXX: these two should be specified like before, with a class field
def addBufferCallback(self, handler, event):
self.bufferEvents[event].add(handler)
# XXX: perhaps the following needs to be around to avoid resetting the connection ungracefully
try:
self.socket.shutdown(2)
except socket_error:
skt = self.createInternetSocket() # XXX: haha misnamed method
if self.port:
skt.bind(self.port)
except socket.error, le:
# TODO: add TCP-like buffering
pass
else:
raise
# TODO: add TCP-like buffering
pass
else:
raise
# XXX: gtk.main_quit() (which is used for crash()) raises an exception if
# gtk.main_level() == 0; however, all the tests freeze if we use this
# function to stop the reactor. what gives? (I believe this may have been
# a stupid mistake where I forgot to import gtk here... I will remove this
"""Uh""" #TODO class AlreadyCalled(ValueError):
# XXX: According to ITransport, this should return an IAddress!
return 'file', 'file'
def getHost(self):
return 'file'
def handleException(self):
# XXX - This is *less than* '::', and will screw up IPv6 servers
return defer.succeed('0.0.0.0')
if abstract.isIPAddress(name):
return defer.succeed(name)
# XXX: The parent's stderr isn't necessarily fd 2 anymore, or
# even still available
# thing to attempt
try:
stderr = os.fdopen(2,'w')
# XXX This could be a useful method, but sometimes it triggers a segfault,
# so we'll steer clear for now.
# def verifyCertificate(self, certificate):
# """
# TODO: error detection here.
def doCreate():
self.hProcess, self.hThread, dwPid, dwTid = win32process.CreateProcess(
command, cmdline, None, None, 1, 0, env, path, StartupInfo)
## TODO: get numberSections from book, if any
numberer.setNumberSections(opt['number'])
walker.generate()
# TODO # Abstract # Bibliography # Index
# XXX there's a *reason* Python comes with the pipes module -
# someone fix this to use it.
r = os.system('pngtopnm "%s" | pnmtops -noturn > "%s"' % (src, target))
if r != 0:
import tree #todo: get rid of this later import indexer class NoProcessorError(Exception):
# TODO: Syntax highlighting
buf = StringIO()
getLatexText(node, buf.write, entities=entities)
data = buf.getvalue()
## XXX -- not really an output test, more of a script test
self.mangleSysPath(self.oldPath)
d = self.runTrial(
os.path.join(self.parent,
# XXX - this is used in test_script, perhaps it should be in a utility module
def testNames(test):
"""
Return the id of each test within the given test suite or case.
## XXX -- should this instead raise a ValueError? -- jml
self.failUnlessRaises(TypeError, self.loader.loadPackage, sample)
def test_loadPackageRecursive(self):
# XXX - duplicated and modified from test_script
def assertSuitesEqual(self, test1, test2):
loader = runner.TestLoader()
names1 = testNames(test1)
# XXX There should really be a general way to hook the plugin system
# for tests.
def getPlugins(iface, *a, **kw):
self.assertEqual(iface, IReporter)
## XXX - should I add an optional parameter to disable the check for
## a custom suite.
## OR, should I add another method
if not isinstance(module, types.ModuleType):
# XXX -- this is here because sometimes people will have methods
# called 'timeout', or set timeout to 'orange', or something
# Particularly, test_news.NewsTestCase and ReactorCoreTestCase
# both do this.
# XXX: if err.tb is a real traceback and not stringified, we should
# use that.
err = (err.type, err.value, None)
return err
# XXX - 'todo' should just be a string
self.unexpectedSuccesses.append((test, todo))
def addExpectedFailure(self, test, error, todo):
# XXX - 'todo' should just be a string
self.expectedFailures.append((test, error, todo))
def addSuccess(self, test):
# XXX - deprecate this method, we don't need it any more
def startSuite(self, name):
pass
def endSuite(self, name):
pass
# XXX questionable whether this was a good design idea...
init = getattr(cProxy, "__init__", None)
if init:
init()
inst = _Dummy() # XXX chomp, chomp
inst.__class__ = regClass
method = inst.unjellyFor
elif isinstance(regClass, type):
# XXX do I need an isFunctionAllowed?
function = namedObject(rest[0])
return function
# TODO: an API for extending banana...
if self.currentDialect == "pb" and obj in self.outgoingSymbols:
symbolID = self.outgoingSymbols[obj]
int2b128(symbolID, write)
# XXX should never get called anymore? check!
for notifier in self.failures:
try:
notifier()
# XXX This call is NOT REENTRANT and testing for reentrancy is just
# crazy, so it likely won't be. Don't ever write methods that call the
# broker's serialize() method recursively (e.g. sending a method call
# from within a getState (this causes concurrency problems anyway so
# XXX Should this be done somewhere else?
else:
self._sendAnswer(netResult, requestID)
##
# TODO: force_decache needs to be able to force-invalidate a
# cacheable reference.
try:
cacheable.stoppedObserving(perspective, RemoteCacheObserver(self, cacheable, perspective))
# XXX We don't know how many to close; hope 100 is plenty.
for i in range(3, 100):
try:
os.close(i)
# XXX Stevens, in his Advanced Unix book, section 13.3 (page
# 417) recommends calling umask(0) and closing unused
# file descriptors. In his Network Programming book, he
# additionally recommends ignoring SIGHUP and forking again
# XXX # Remove unescape_attr method # Remove parser testing hack # safeUrl()-ize action
# XXX should it be successful even if unnamed?
if self.name is None or self.disabled:
return []
return [(self._index, self.name, "")]
# TODO according to the comment above: # with python 2.5 the above 3 lines should be again: # sys.exit(result)
# XXX What should the default be? all: inplace runners # Build in-place
# XXX What should the default be? test: test_inplace ftest: ftest_inplace
# XXX need tests!
#
datamap = {}
for dir, paths in pkginfo.data_files:
# XXX we shouldn't actually need this if we only
# use this class to scan in the generated
# distributions
if os.path.isfile(
# XXX I'm not sure whether documentation files should be
# removed from package_data or not, given that there's no spec
# for installing documentation other than for RPMs.
#