Mutable tensor for local session and web session#536
Mutable tensor for local session and web session#536wjsi merged 20 commits intomars-project:masterfrom
Conversation
mars/web/api.py
Outdated
| register_web_handler('/api/session/(?P<session_id>[^/]+)/graph/(?P<graph_key>[^/]+)/data/(?P<tileable_key>[^/]+)', | ||
| GraphDataHandler) | ||
| register_web_handler('/api/session/(?P<session_id>[^/]+)/mutable-tensor', MutableTensorHandler) | ||
| register_web_handler('/api/session/(?P<session_id>[^/]+)/mutable-tensor/write', MutableTensorWriteHandler) |
There was a problem hiding this comment.
This API design does not conform to RESTful styles. May change to ``/api/session/(?P<session_id>[^/]+)/mutable-tensor/(?P<name>[^/]+) and use GET / PUT / POST methods to handle data read / write / seal.
There was a problem hiding this comment.
hi @wjsi Mutable tensor requires four endpoints: create/get/write/seal. Is it ok to use the following mapping?
POSTforcreateHEADforget(isHEADok here ?)PUTforwriteGETforseal
There was a problem hiding this comment.
GET shall not cause any side-effect on the storage. Therefore create and write can be merged into PUT and the implementation of API decide whether to create while POST for seal, or POST with an action indicating create or seal. If I were the writer of this PR, I would prefer the former solution.
There was a problem hiding this comment.
create and write can be merged into PUT and the implementation of API decide whether to create while POST for seal
Nice suggestion, thanks!
There was a problem hiding this comment.
I have revised the endpoint by use POST for both write and seal (easier to distinguish since write has body payload and seal doesn't have, and write's body payload is raw bytes, not json, making create and write are harder to distinguish without extra paramter).
Now all four API of mutable tensor share the same HTTP endpoint.
There was a problem hiding this comment.
Customized headers can also be used to pass string meta data if you do not like query strings.
| _nsplits=tensor.nsplits, _key=tensor.key, _chunks=tensor.chunks)) | ||
|
|
||
|
|
||
| def setitem_as_records(nsplits_acc, output_chunk, value, ts, is_scalar): |
|
One more thing, really cool to see the error can be serialized and sent to client, but sadly no test is added for that, could you please try to add some ut? |
|
We can move |
|
The exception info return by the http api of web session can be validated in the |
|
@hekaisheng I will revise the patch and move the |
|
Great, I have an idea, can we add a |
Agree. The |
Will do that. |
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
This reverts commit f566da1.
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
|
mars/scheduler/mutable.py
Outdated
| ep = self.get_scheduler(chunk_key) | ||
| # register quota | ||
| quota_ref = self.ctx.actor_ref(MemQuotaActor.default_uid(), address=ep) | ||
| quota_ref.request_batch_quota({record_chunk_key: records.nbytes}) |
There was a problem hiding this comment.
ReceiverActor takes little process memory, and quota request is not needed.
There was a problem hiding this comment.
The quota is for chunk of records (the (index, value) record of write operations) and the record chunk may be spilled, thus the quota is required, IMO.
There was a problem hiding this comment.
ReceiverActor stores data in plasma_store or disk. This means the cost of process memory equals to zero when receiving data from other machines. What's more, we serialize with pyarrow with zero-copy and spill in small chunks (not chunks in Mars), hence the additional memory cost is no more than the size of these chunks. Therefore there is no need requesting for quotas before data transfer.
There was a problem hiding this comment.
Fixed. No need to requesting quota now.
mars/worker/utils.py
Outdated
| return '%s_load_memory_%s' % (graph_key, chunk_key) | ||
|
|
||
|
|
||
| def put_chunk(session_id, chunk_key, data, receiver_ref): |
There was a problem hiding this comment.
This function shall be put in transfer.py and renamed as put_remote_chunk as it is not referenced by any code in worker module.
mars/web/api.py
Outdated
|
|
||
| def post(self, session_id, name): | ||
| try: | ||
| # If the request contains no body payload, it is seal, otherwise it is create |
There was a problem hiding this comment.
I still think it is better to use a customized header or a query string argument to define the action of POST.
There was a problem hiding this comment.
Fixed. A parameter action has been added.
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
|
Could you please confirm that the new commits have resolved your comments? @wjsi |
What do these changes do?
Support mutable tensor in local session and web session.
LocalSession, wename -> mut tensorinLocalSession, and do write to the ndarray directly.seal, we use the ndarray to construct amt.tensorand execute it withself._executorknow the tensor is executed.WebSession, wewrite, we forward theindexandvaluetoMutableTensorActor, theMutableTensorActormaintains the buffer, and send record chunks to corresponding workers.seal, do the same thing withLocalClusterSession.Some design point that needs to clarify:
index(such as(slice(1, None, None))) cannot be serialized to json directly, thus I add a helper classMutableTensor.Indexand leverage theSerializable.to_jsonto do the serialization.WebSession, the index calculation and chunk transfer are done inMutableTensorActor.LocalSessionwork well with theWebSession(as shown by the unit test).About the (known) failed test cases:
It seems that there is no way to obtain the information and message of exception that initially raised in server at client side. We have
dump_exceptionandreraise,(Edit: this exception info issue has been fixed in commit 31520ebc9 of this PR.)
But it doesn't work because:
base64.b64encode(pickled_exc)isbytes, notstr, which cannot bejson.dumps..decode('ascii')), we will found thatresp.textis something likerather than the serialized exception information. Not sure if this limitation is a bug or just by-design.
Related issue number
#415