Discussion:
[Twisted-Python] Using a custom reactor in twisted trial for test cases?
Crispin Wellington
2009-10-30 09:38:12 UTC
Permalink
Hi there,

I am using twisted trial to run test cases for an application. The
application however uses stackless python and has a custom stackless
reactor. I implemented this reactor like this...

-------------------- stacklessreactor.py -----------------------
# Use epoll() as our base reactor
from twisted.internet.epollreactor import EPollReactor as StacklessBaseReactor

import stackless

# seconds between running the greenthreads. 0.0 for flat out 100% CPU
STACKLESS_MAX_PUMP_RATE = 0.1

class StacklessReactor(StacklessBaseReactor):
"""This reactor does the stackless greenthread pumping in the main thread, interwoven with the reactor pump"""

def doIteration(self, timeout):
"""Calls the base reactors doIteration, and then fires off all the stackless threads"""
if timeout > STACKLESS_MAX_PUMP_RATE:
timeout = STACKLESS_MAX_PUMP_RATE
stackless.schedule()
return StacklessBaseReactor.doIteration(self,timeout)

def install():
"""
Install the stackless() reactor.
"""
p = StacklessReactor()
from twisted.internet.main import installReactor
installReactor(p)
-------------------------------------------------------------------

And I install this as my reactor in my application with...

import stacklessreactor
stacklessreactor.install()

...placed right at the top of my .tac python file. And this all works.
Running the app with twistd, the custom reactor is installed and is used
as the reactor for the app.

Now however, I come to write tests and run them with trial. I *need* the
tests to be run under the stackless reactor or things simply wont work
(a lot of the code I need to test are stackless tasklets).

When I go "/usr/local/stackless/bin/trial --help-reactors" I get the
following list:

kqueue kqueue(2)-based reactor.
win32 Win32 WaitForMultipleObjects-based reactor.
epoll epoll(4)-based reactor.
iocp Win32 IO Completion Ports-based reactor.
gtk Gtk1 integration reactor.
cf CoreFoundation integration reactor.
gtk2 Gtk2 integration reactor.
default The best reactor for the current platform.
debug-gui Semi-functional debugging/introspection reactor.
poll poll(2)-based reactor.
glib2 GLib2 event-loop integration reactor.
select select(2)-based reactor.
wx wxPython integration reactor.
qt QT integration reactor

One of these I can use by passing in --reactor=name.

So the question is, is there a way of getting the trial framework to use
my custom reactor? Is there a way to get my reactor into that list
somehow? Is this not a supported feature of trial?

And... if this isn't a supported feature, what is the best way to get a
TestCase that will run under that reactor?

Look forward to any help people can offer me.

With kind regards

Crispin Wellington
e***@twistedmatrix.com
2009-10-30 14:06:21 UTC
Permalink
Post by Crispin Wellington
Hi there,
I am using twisted trial to run test cases for an application. The
application however uses stackless python and has a custom stackless
reactor. I implemented this reactor like this...
It looks like your custom reactor is mainly in charge of making sure
stackless.schedule() gets called at least once every 0.1 seconds. Is
that right? If so, a much better approach would be to use
twisted.internet.task.LoopingCall rather than implementing a custom
reactor.

Is there something undesirable about that (much simpler, less fragile)
approach?

As for your actual question, if you want a new reactor to be as usable
as one of the existing ones, you need to write a plugin declaring its
available. Take a look at twisted/plugins/twisted_reactors.py for some
examples.

Jean-Paul
Crispin Wellington
2009-11-02 04:17:41 UTC
Permalink
Post by e***@twistedmatrix.com
It looks like your custom reactor is mainly in charge of making sure
stackless.schedule() gets called at least once every 0.1 seconds. Is
that right? If so, a much better approach would be to use
twisted.internet.task.LoopingCall rather than implementing a custom
reactor.
Is there something undesirable about that (much simpler, less fragile)
approach?
I tried using LoopingCall, but it does not work. It only calls the
scheduler once. I think this has to do with the fact that the stackless
scheduler needs to be interwoven with the twisted reactor pump. There is
more info about why it has to be done like this here:
http://code.google.com/p/stacklessexamples/wiki/StacklessTwisted
Post by e***@twistedmatrix.com
As for your actual question, if you want a new reactor to be as usable
as one of the existing ones, you need to write a plugin declaring its
available. Take a look at twisted/plugins/twisted_reactors.py for some
examples.
I shall have a look at that approach. Thanks.

Crispin
Glyph Lefkowitz
2009-11-02 06:12:27 UTC
Permalink
Post by Crispin Wellington
Post by e***@twistedmatrix.com
It looks like your custom reactor is mainly in charge of making sure
stackless.schedule() gets called at least once every 0.1 seconds. Is
that right? If so, a much better approach would be to use
twisted.internet.task.LoopingCall rather than implementing a custom
reactor.
Is there something undesirable about that (much simpler, less
fragile)
approach?
I tried using LoopingCall, but it does not work. It only calls the
scheduler once. I think this has to do with the fact that the
stackless
scheduler needs to be interwoven with the twisted reactor pump.
LoopingCall does "interweave" the function that you pass to it with
the "twisted reactor pump". If you were using it correctly, it would
work, as the page that you link to indicates :).

What do you mean "does not work"?
Post by Crispin Wellington
There is
http://code.google.com/p/stacklessexamples/wiki/StacklessTwisted
This example, linked from that page:
<http://stacklessexamples.googlecode.com/svn/trunk/examples/twisted/TwistedTimerReactorControl.py
is roughly the same as what exarkun recommended.

These examples aren't the greatest, because they tend to assume that
you _always_ have Stackless code that's ready to run, and you want to
run at some number of "frames per second". For simple examples this
makes sense, but for a system architecture this is a limiting
approach. If your stackless application wants to sit idle and wait
for input, you're still going to wake up once every 1/30 second and
checking to see if there's anything to do, burning CPU cycles and
battery life. Also, if you want to run *faster* than the arbitrary
timeout you've selected (1/30 second can be a very long time,
especially if you're doing something pseudo-realtime, like audio
playback) you're out of luck.

Better would be to have tasklets schedule *themselves* for when they
want to run. If you just want to allow the rest of the reactor a time-
slice, twisted.internet.task.cooperate() will allow you to schedule a
potentially arbitrary number of tasks which want to run "as often as
possible" without completely swamping the reactor; you can suspend
that task by returning a Deferred which only fires when more stackless
stuff is actually ready to run.
Crispin Wellington
2009-11-02 07:22:46 UTC
Permalink
Post by Glyph Lefkowitz
Post by Crispin Wellington
I tried using LoopingCall, but it does not work. It only calls the
scheduler once. I think this has to do with the fact that the stackless
scheduler needs to be interwoven with the twisted reactor pump.
What do you mean "does not work"?
OK. Having a closer look, its not that looping call doesn't work, its
that there is some unknown number of "reactor pumps" between starting
the test, and finishing it. What I need is a way for the reactor to be
pumping away while a particular test function of a testcase continues
working. As an example, here is a non-working test case. Notice the
comment "#what to do here to pump the reactor?".

------------------
from twisted.trial import unittest, reporter, runner
import os, time
import stackless

from twisted.internet import reactor, task

def example_func(t=10.0):
"""wait for t seconds then return"""
exit_time = time.time()+t
while time.time()<exit_time:
stackless.schedule()

t = task.LoopingCall(stackless.schedule)
t.start(0.1)

class StacklessTest(unittest.TestCase):
def setUpClass(self):
pass

def test_stackless(self):
"""Test that we can successfuly create a user proxy cert"""
# get the time now...
start = time.time()

task = stackless.tasklet(example_func)
task.setup(10.0)
task.run()

while task.alive:
pass #what to do here to pump the reactor?

# end time
end = time.time()

self.assert_( endtime - starttime >= 10.0 )

---------------------

Running this under trial, it just hangs, inside the while task.alive:
loop.

So I guess my problem is my approach. How do I test long running
"tasklets" that use twisted calls (unlike this contrived example that
only sleeps) within the twisted trial framework?

How would the Twisted experts write test code for a case like this? Any
help is much appreciated!

Kind Regards

Crispin Wellington
Paul Thomas
2009-11-02 13:14:37 UTC
Permalink
Post by Crispin Wellington
Post by Glyph Lefkowitz
Post by Crispin Wellington
I tried using LoopingCall, but it does not work. It only calls the
scheduler once. I think this has to do with the fact that the stackless
scheduler needs to be interwoven with the twisted reactor pump.
What do you mean "does not work"?
OK. Having a closer look, its not that looping call doesn't work, its
that there is some unknown number of "reactor pumps" between starting
the test, and finishing it. What I need is a way for the reactor to be
pumping away while a particular test function of a testcase continues
working.
[snip]
Post by Crispin Wellington
loop.
So I guess my problem is my approach. How do I test long running
"tasklets" that use twisted calls (unlike this contrived example that
only sleeps) within the twisted trial framework?
I'm no expert, but I have some tests against objects that use
spawnProcess() which has the similar problem that I need to pump the
reactor. The key is to return a deferred from the trial test method -
then trial does the pump for you. I'm actually using @inlineCallbacks
and the sleep() function mentioned in a previous post - you'll
probably want task.deferLater() or similar.
Crispin Wellington
2009-11-02 04:31:59 UTC
Permalink
Post by e***@twistedmatrix.com
As for your actual question, if you want a new reactor to be as usable
as one of the existing ones, you need to write a plugin declaring its
available. Take a look at twisted/plugins/twisted_reactors.py for some
examples.
So if I make a 'plugin' that declares a
twisted.application.reactors.Reactor in the way done so inside
twisted/plugins/twisted_reactors.py, how do I make twisted trial pick it
up? Or do I actually have to add it into
twisted/plugins/twisted_reactors.py? Or monkey punch it in? If so, where
do I bootstap it in? Trial seems to need the reactor before TestCases
are even loaded.

Thanks again for any help

Crispin
Andrew Francis
2009-10-30 17:12:48 UTC
Permalink
Hi Crispin:

Message: 8
Date: Fri, 30 Oct 2009 17:38:12 +0800
From: Crispin Wellington <***@ccg.murdoch.edu.au>
Subject: [Twisted-Python] Using a custom reactor in twisted trial for
    test    cases?
To: twisted-***@twistedmatrix.com
Message-ID: <***@wolfwood>
Content-Type: text/plain
Post by Crispin Wellington
I am using twisted trial to run test cases for an application. The
application however uses stackless python and has a custom stackless
reactor. I implemented this reactor like this...
For better or worse, much of what you have discussed have been fleshed out about two years ago in this mailing list and in stackless. Also I outline strategies in the PyCon 2008 talk "Adventures in Stackless Twisted Integration." Personally, I don't know why more people don't use Stackless with Twisted.

Some advice:

1) Based on what I have seen, you don't need to write a custom reactor (I still remember Glyph setting me straight on that one). At least for what you are doing :-).

2) Use task.loopingCall to make the reactor periodically yield.

l = task.LoopingCall(stackless.schedule)
l.start(.01)

however there is some weird interaction between Stackless Python 2.6 and Twisted so you may have to do

task.LoopingCall(__tick__)

def __tick__():
stackless.schedule

or play with the recursionlimit variable. I have not gotten to the bottom of that problem.


Cheers,
Andrew
Andrew Francis
2009-11-02 17:27:37 UTC
Permalink
Post by Crispin Wellington
OK. Having a closer look, its not that looping call doesn't work, its
that there is some unknown number of "reactor pumps" between starting
the test, and finishing it. What I need is a way for the reactor to be
pumping away while a particular test function of a testcase continues
working. As an example, here is a non-working test case. Notice the
comment "#what to do here to pump the reactor?".
...
Post by Crispin Wellington
pass #what to do here to pump the reactor?
This won't work because as long as the tasklet is alive (scheduled, paused, or blocked), you will be in a tight loop. And if that function
is not running in the reactor, then the reactor will starve. If you
are waiting to see if the tasklet finished, write a synchronizer.

Cheers,
Andrew
Andrew Francis
2009-11-02 17:39:39 UTC
Permalink
Post by Glyph Lefkowitz
These examples aren't the greatest, because they tend to assume that
you _always_ have Stackless code that's ready to run, and you want to
run at some number of "frames per second". 
In 2007, folks (especially myself) are still trying to figure out if
Stackless can play with Twisted correctly. I know my stuff in the
repository is old.
Post by Glyph Lefkowitz
For simple examples this makes sense, but for a system architecture
this is a limiting approach.  If your stackless application wants to
sit idle and wait for input, you're still going to wake up once every >1/30 second and checking to see if there's anything to do, burning CPU >cycles and battery life. 
An approach would be for the reactor tasklet to look at the runnable
list to see if there are any entries. If there are no entries
(i.e., tasklets are blocked and dependent on the reactor tasklet),
then the reactor tasklet can stop the loopingCall. This is easy enough
to do. However I suspect one starts to get into custom reactor land.
Post by Glyph Lefkowitz
Also, if you want to run *faster* than the arbitrary timeout you've
selected (1/30 second can be a very long time, especially if you're
doing something pseudo-realtime, like audio playback) you're out of luck.
I believe there are two scenarios here:

1) no blocked tasklets and only runnable ones. If the runnable tasklets make no IO requests, it should be possible to put the reactor tasklet to sleep. Let the system run as fast as it can.

2) There is a mix of runnable (scheduled) and blocked tasklets. This
scenario requires a more sophisticated scheduler that is trying to
compute an optimal loopingCall value. This scenario is tricky.
Post by Glyph Lefkowitz
Better would be to have tasklets schedule *themselves* for when they
want to run. 
In the Stackless world, tasklets start running from the moment
they are created/inserted into the runnable list. Stackless in
non-preemptive mode is more about knowing when to make a tasklet
yield to the scheduler.

Cheers,
Andrew

Loading...