Discussion:
[Twisted-Python] HTTP PUT a GET's streaming response with treq
Nagy, Attila
2017-05-04 12:04:02 UTC
Permalink
Hi,

I would like to copy a file between two HTTP servers in the most
efficient manner, which probably means a streaming down/upload (GET/PUT)
with the producer/consumer scheme.

Can this be done easily, could somebody write a simple example?

Thanks,
Phil Mayers
2017-05-04 13:04:13 UTC
Permalink
Post by Nagy, Attila
Hi,
I would like to copy a file between two HTTP servers in the most
efficient manner, which probably means a streaming down/upload (GET/PUT)
with the producer/consumer scheme.
Can this be done easily, could somebody write a simple example?
I guess it'll look something like this:

https://gist.github.com/philmayers/692d04c15a71d3905e075b30f0543f23

Only lightly tested, but seems to work. There's a lot of debugging print
statements in the above code which can obviously be discarded or turned
into logging statements in production use.
Nagy, Attila
2017-05-04 13:32:24 UTC
Permalink
Post by Phil Mayers
Post by Nagy, Attila
Hi,
I would like to copy a file between two HTTP servers in the most
efficient manner, which probably means a streaming down/upload
(GET/PUT) with the producer/consumer scheme.
Can this be done easily, could somebody write a simple example?
https://gist.github.com/philmayers/692d04c15a71d3905e075b30f0543f23
Only lightly tested, but seems to work. There's a lot of debugging
print statements in the above code which can obviously be discarded or
turned into logging statements in production use.
In the meantime I've started to write one and came out with this:
https://gist.github.com/bra-fsn/42e556ddd74ff34467b17b9a6d613b33

Are there any problems with this approach?
Phil Mayers
2017-05-04 14:48:15 UTC
Permalink
Post by Nagy, Attila
Are there any problems with this approach?
You're poking at a private member of the .original IResponse there, and
moreover the transport object which seems risky.

The single treq.collect call to consumer.write will also not necessarily
respect the consumer-issued flow control (even if touching the
_transport was valid, you don't know you won't get buffered data after
you've been asked to stop)
Nagy, Attila
2017-05-04 15:30:50 UTC
Permalink
Post by Phil Mayers
Post by Nagy, Attila
Are there any problems with this approach?
You're poking at a private member of the .original IResponse there,
and moreover the transport object which seems risky.
I would like to use the simplest (and correct of course) solution.
Juggling with buffering/data by hand seems even more risky to me.
Your implementation seems to do things which twisted should do, which
(also) doesn't seem to be right. :)
It would be nice if this could be solved in an elegant way.
Post by Phil Mayers
The single treq.collect call to consumer.write will also not
necessarily respect the consumer-issued flow control (even if touching
the _transport was valid, you don't know you won't get buffered data
after you've been asked to stop)
It seems to do what I want to do. :)

BTW (without reading it), I've tried your solution (thank you very much
for it!).
From a 1073741824 bytes source file, it transferred a 1056490639
destination file.
Another run made a 1044534695 bytes destination file.
Well, each run yields a different size.
Phil Mayers
2017-05-05 08:26:49 UTC
Permalink
Post by Nagy, Attila
I would like to use the simplest (and correct of course) solution.
Juggling with buffering/data by hand seems even more risky to me.
The problem with the approach you've outlined is that it treats the
transport (a private member) in ways that I suspect are invalid. In
particular there's no handling of the length of the object or chunked
encodings - I suspect what you're doing will only work on simple HTTP
requests with no connection reuse.

I *think* if you use the t.web deliverBody response method, you get
legal access to a transport (via an IProtocol you provide) and the API
docs confirm that transport can be accessed and paused.

However if you look at:

https://github.com/twisted/twisted/blob/twisted-17.1.0/src/twisted/web/_newclient.py#L1069

...you'll see that deliverBody starts writing to the transport straight
away, and you have no way to know if the consumer will have been
provided to IBodyProducer at that point, or if it'll have been paused.

I think you inevitably will have to buffer some data, and a quick PoC of
that code suggests to me that handling of the return deferred from
startProducing is quite complex.


Perhaps someone more knowledgeable than I can chime in here?
Post by Nagy, Attila
Your implementation seems to do things which twisted should do, which
(also) doesn't seem to be right. :)
It would be nice if this could be solved in an elegant way.
On that I completely agree. It could be that there's some simple idiom
or built-in code for this that I'm unaware of.
Post by Nagy, Attila
BTW (without reading it), I've tried your solution (thank you very much
for it!).
From a 1073741824 bytes source file, it transferred a 1056490639
destination file.
Hey, I did say lightly tested ;o)

I think it's a bug in cleanly handling things at the end of the request.
Nagy, Attila
2017-05-05 09:41:41 UTC
Permalink
Post by Phil Mayers
Post by Nagy, Attila
I would like to use the simplest (and correct of course) solution.
Juggling with buffering/data by hand seems even more risky to me.
The problem with the approach you've outlined is that it treats the
transport (a private member) in ways that I suspect are invalid. In
particular there's no handling of the length of the object or chunked
encodings - I suspect what you're doing will only work on simple HTTP
requests with no connection reuse.
What possible side effects do you see here? What problems could it cause?
So far I've tried to copy a lot of files in parallel, arbitrarily drop
connections. Everything seems OK.
The file is either transferred in its entirety or (in case of dropped
connections) not.
Post by Phil Mayers
Post by Nagy, Attila
Your implementation seems to do things which twisted should do, which
(also) doesn't seem to be right. :)
It would be nice if this could be solved in an elegant way.
On that I completely agree. It could be that there's some simple idiom
or built-in code for this that I'm unaware of.
I couldn't find a complete example for this task, so it would be nice if
somebody more experienced could share the secret of doing this right and
nice. :)

Thanks,
Phil Mayers
2017-05-05 10:29:15 UTC
Permalink
Post by Nagy, Attila
What possible side effects do you see here? What problems could it cause?
So far I've tried to copy a lot of files in parallel, arbitrarily drop
connections. Everything seems OK.
The file is either transferred in its entirety or (in case of dropped
connections) not.
Cool. If you're happy using private members, knock yourself out.
Nagy, Attila
2017-05-05 11:40:43 UTC
Permalink
Post by Phil Mayers
Post by Nagy, Attila
What possible side effects do you see here? What problems could it cause?
So far I've tried to copy a lot of files in parallel, arbitrarily
drop connections. Everything seems OK.
The file is either transferred in its entirety or (in case of dropped
connections) not.
Cool. If you're happy using private members, knock yourself out.
I'm not happy with that of course. The really cool thing would be if
Twisted could support this task without the need of complicated and
error-prone code.
Phil Mayers
2017-05-05 11:53:23 UTC
Permalink
Post by Nagy, Attila
Post by Phil Mayers
Cool. If you're happy using private members, knock yourself out.
I'm not happy with that of course. The really cool thing would be if
Twisted could support this task without the need of complicated and
error-prone code.
I don't disagree, of course. Cory seems to know the score here, and if I
understand his email, right now you can't do it without buffering.

I wonder what the minimal implementation looks like right now, though?
Cory Benfield
2017-05-05 11:10:47 UTC
Permalink
Post by Nagy, Attila
Post by Nagy, Attila
I would like to use the simplest (and correct of course) solution.
Juggling with buffering/data by hand seems even more risky to me.
The problem with the approach you've outlined is that it treats the transport (a private member) in ways that I suspect are invalid. In particular there's no handling of the length of the object or chunked encodings - I suspect what you're doing will only work on simple HTTP requests with no connection reuse.
What possible side effects do you see here? What problems could it cause?
The first is that Twisted will break your code eventually. Private member attributes are not covered by Twisted’s deprecation policy, and they can be changed without warning for any reason. So you’ll need to pin your Twisted version.

As a second note, you may lock yourself out of HTTP/2. HTTP/2 is not guaranteed to give you access to a raw transport object (though it might), because in HTTP/2 the protocol is not a dumb byte pipe like it is in HTTP/1.1. Code like this forces Twisted devs who want to add HTTP/2 support (like myself) to implement HTTP/2 as a multiple-object abstraction to allow each request/response pair’s underlying “transport” member to act like a dumb byte-pipe transport, when we’d much rather use a less complex abstraction (as an example you should look at the HTTP/2 server code in twisted.web, which has multiple classes to maintain this fiction that you can just call “transport.write” and expect that to work).

As a third note, your code does not handle the possibility that original._transport may not implement IPushProducer. While *in practice* it tends to, it needn’t. On top of that, it is not forbidden for an IPushProducer implementation to call write() even when paused, and code that wants to be correct in the face of all situations will need to be able to buffer anyway.

However, you’re right that this is not ideal. I think the best solution would be an enhancement to twisted.web that updates the default Response object to accept an IConsumer as the protocol argument of deliverBody. This would allow t.w._newclient.Response to be the arbiter of what it means to “pause” production, and allow you to continue to proxy between the two but without accessing a private member (you’d get given the producer you need to pause in registerProducer).

If that’s an enhancement you’d be interested in, I can work with you to get that patch in place. Then your code would change a bit (note that this code won’t work right now):

class UploadProducer(protocol.Protocol):
implements(IBodyProducer)
implements(IConsumer)

def __init__(self, get_resp):
self.length = get_resp.length
self.producing = False
self._producer = None
self._consumer = None
self._completed = Deferred()

# IConsumer
def registerProducer(self, producer, streaming):
assert streaming
self._producer = producer
if self._consumer is None:
self._producer.pauseProducing()

def unregisterProducer(self):
# Raise an error or something
pass

def write(self, data):
self._consumer.write(data)

# IProtocol
def connectionLost(self, reason):
self._completed.callback(reason)

# IBodyProducer
def startProducing(self, consumer):
if self._producer is not None:
self._producer.resumeProducing()
self._consumer = consumer
return completed

def resumeProducing(self):
self._producer.resumeProducing()

def pauseProducing(self):
self._producer.pauseProducing()

def stopProducing(self):
self._producer.stopProducing()


@inlineCallbacks
def copy(src, dst):
get_resp = yield treq.get(src, unbuffered=True)
print "GET", get_resp.code, get_resp.original
producer = UploadProducer(get_resp)
get_resp.deliverBody(producer)

put_resp = yield treq.put(dst,data=producer)
print "PUT", put_resp, put_resp.code


With this arrangement as well it’d potentially be possible to use something like tubes, or at least get closer to using tubes for this use case. Right now it’s a bit of an annoyance that t.w._newclient doesn’t allow the body receiving protocol to exert backpressure on the data.

Anyway, just a thought.

Cory
Phil Mayers
2017-05-05 11:38:49 UTC
Permalink
Post by Cory Benfield
As a second note, you may lock yourself out of HTTP/2. HTTP/2 is not
guaranteed to give you access to a raw transport object (though it
FWIW this kind of thing was, more or less, what I was thinking. It seems
unlikely that stealing the TCP connection from Response for a few bytes
was wise...
Post by Cory Benfield
As a third note, your code does not handle the possibility that
original._transport may not implement IPushProducer. While *in
IIUC IResponse.deliverBody *does* guarantee (currently) to provide a
transport implementing IPushProducer to the supplied IProtocol, at least
as far as the docstring says?

Obviously that doesn't make accessing the private _transport safe - an
implementation could wrap the transport to implement the IPushProducer
semantics.
Post by Cory Benfield
practice* it tends to, it needn’t. On top of that, it is not
forbidden for an IPushProducer implementation to call write() even
when paused, and code that wants to be correct in the face of all
situations will need to be able to buffer anyway.
Thanks for confirming this. I suspected as much, but was not certain (I
make relatively little use of producer/consumer APIs in my own code).
Post by Cory Benfield
use case. Right now it’s a bit of an annoyance that t.w._newclient
doesn’t allow the body receiving protocol to exert backpressure on
the data.
How does that square with deliverBody providing an IPushProducer? Is it
the case that the transport may just disobey the pauseProducing requests
as you've noted above?
Cory Benfield
2017-05-06 19:09:13 UTC
Permalink
Post by Cory Benfield
As a third note, your code does not handle the possibility that
original._transport may not implement IPushProducer. While *in
IIUC IResponse.deliverBody *does* guarantee (currently) to provide a transport implementing IPushProducer to the supplied IProtocol, at least as far as the docstring says?
Obviously that doesn't make accessing the private _transport safe - an implementation could wrap the transport to implement the IPushProducer semantics.
Yeah, it does. Sadly, there’s some complexity hidden there.

In this case, the “transport” object being provided to the IProtocol is a proxy to the TCP transport. However, that object does not write directly to the provided IProtocol, it writes to the t.w._newclient.HTTPClientParser and that chucks all kinds of data through the system. One of these layers is buffering, and that layer is *not* being told about calls to the IPushProducer methods. Thus, if you call pauseProducing from the IProtocol you provide, you may still get calls to your dataReceived method even though the producer is paused.

Mostly this seems to happen when the first body bytes are delivered, which seems to call makeConnection and then immediately dispatch any body bytes that have been received, even if the makeConnection method called pauseProducing on its transport. Not so helpful!

This also makes it impossible to avoid buffering. There seems to be no way to prevent the Response object from delivering the first bytes of the body before startProducing is called by treq for the upload. That’s pretty frustrating. Worse, the Response will *also* fire the “body complete” callback. All of this can and does happen before treq.put ever gets a chance to call startProducing, which means that you just have to buffer all of these events. This is really quite painful.

This is a design pattern that is in a few different places in twisted.web: situations where the “transport” provided to a higher-level abstraction is a proxy to the TCP connection, but where intermediate layers are actually responsible for calling dataReceived on those higher level abstractions. This causes a real problem if there is any chance that intermediary layers will do any buffering at all, because those layers will not see calls to pauseProducing or anything similar, and so will not respect those calls. The Twisted HTTP/2 server tries very hard to avoid this by ensuring that its classes actually masquerade as transports, rather than delegating to the lower-level TCP transport (though of course, they have to do this because HTTP/2 is framed!).

Fixing this is potentially a bit non-trivial. It involves changing t.w._newclient.Response to be able to act as a transport proxy (probably via some kind of mixin for the sake of code composition) and then to respect the producing state of the protocol receiving the body inside the _bodyDataReceived method, instead of just YOLOing data into that protocol assuming that it hasn’t immediately paused the production.

I consider this a bug in Twisted: I’ll open a ticket.

Cory
Cory Benfield
2017-05-06 19:15:30 UTC
Permalink
I consider this a bug in Twisted: I’ll open a ticket.
This is now Twisted ticket #9132: https://twistedmatrix.com/trac/ticket/9132 <https://twistedmatrix.com/trac/ticket/9132>
Nagy, Attila
2017-05-05 11:58:28 UTC
Permalink
Post by Cory Benfield
The first is that Twisted will break your code eventually. Private member attributes are not covered by Twisted’s deprecation policy, and they can be changed without warning for any reason. So you’ll need to pin your Twisted version.
I feel myself unconfortable with this, that's why we are corresponding. :)
Post by Cory Benfield
As a second note, you may lock yourself out of HTTP/2. HTTP/2 is not guaranteed to give you access to a raw transport object (though it might), because in HTTP/2 the protocol is not a dumb byte pipe like it is in HTTP/1.1. Code like this forces Twisted devs who want to add HTTP/2 support (like myself) to implement HTTP/2 as a multiple-object abstraction to allow each request/response pair’s underlying “transport” member to act like a dumb byte-pipe transport, when we’d much rather use a less complex abstraction (as an example you should look at the HTTP/2 server code in twisted.web, which has multiple classes to maintain this fiction that you can just call “transport.write” and expect that to work).
Having HTTP/2 (along with 1.1) of course would be the best, but
currently I can easily live without it. It's far from being standard.
And yet, its multiplexing would be one of the greatest achievement here
(if correctly implemented). Copying objects with a lot of HTTP/TCP
channels is too stressful sometimes (too much connections, TIME_WAIT
problems etc).
Post by Cory Benfield
However, you’re right that this is not ideal. I think the best solution would be an enhancement to twisted.web that updates the default Response object to accept an IConsumer as the protocol argument of deliverBody. This would allow t.w._newclient.Response to be the arbiter of what it means to “pause” production, and allow you to continue to proxy between the two but without accessing a private member (you’d get given the producer you need to pause in registerProducer).
Absolutely. I think this use case is far from being brain-dead, so if
it's possible to do it right out of the box, I guess everybody wins with it.
Post by Cory Benfield
implements(IBodyProducer)
implements(IConsumer)
self.length = get_resp.length
self.producing = False
self._producer = None
self._consumer = None
self._completed = Deferred()
# IConsumer
assert streaming
self._producer = producer
self._producer.pauseProducing()
# Raise an error or something
pass
self._consumer.write(data)
# IProtocol
self._completed.callback(reason)
# IBodyProducer
self._producer.resumeProducing()
self._consumer = consumer
return completed
self._producer.resumeProducing()
self._producer.pauseProducing()
self._producer.stopProducing()
@inlineCallbacks
get_resp = yield treq.get(src, unbuffered=True)
print "GET", get_resp.code, get_resp.original
producer = UploadProducer(get_resp)
get_resp.deliverBody(producer)
put_resp = yield treq.put(dst,data=producer)
print "PUT", put_resp, put_resp.code
This looks much clearer than Phil's solution and lacks the error-prone
custom buffering, which is nice.
What can I do to make this happen? :)
Post by Cory Benfield
With this arrangement as well it’d potentially be possible to use something like tubes, or at least get closer to using tubes for this use case. Right now it’s a bit of an annoyance that t.w._newclient doesn’t allow the body receiving protocol to exert backpressure on the data.
Apart from correctness having some traces of performance would also be
good. I don't know how tubes compare to this, but the current (not nice)
solution can easily transfer more than one gigabit/s with one process, I
consider that a good baseline. :)
Post by Cory Benfield
Anyway, just a thought.
Thank you very much for joining and your help.
Manish Tomar
2017-05-04 15:38:05 UTC
Permalink
Have you looked at
http://treq.readthedocs.io/en/latest/howto.html#handling-streaming-responses
?

Regards,
Manish
Hi,
I would like to copy a file between two HTTP servers in the most efficient
manner, which probably means a streaming down/upload (GET/PUT) with the
producer/consumer scheme.
Can this be done easily, could somebody write a simple example?
Thanks,
_______________________________________________
Twisted-Python mailing list
http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
Loading...