Skip to content

Standardize Scheduler and Worker APIs #882

@qinxuye

Description

@qinxuye

Background

Currently we only have APIs for schedulers, and a web API which exposes a http interface. Some issue emerges that client can only talk to scheduler, sometimes even web ui. For the mutable tensor, if we want to write a mutable tensor, we have to write data into Mars scheduler, buffer it, then distribute data to involved workers. This is super inefficient, and the data could be serde twice which is a huge waste.

Mars context is introduced in #416 , which is originally purpose for tile and execution that some operation like querying meta etc could be performed in them.

Proposals

Thus I suggest to divide our APIs into scheduler and worker parts, for the scheduler part, it focuses on the meta query and job submission, meanwhile, worker parts include writing or reading data from workers directly.

Context would be an entry to talk to Mars cluster including schedulers and workers, and will be visible in a few places beyond tile and execution.

Scheduler APIs

Cluster infomations

  • count_schedulers() -> int: get size of schedulers.
  • get_schedulers() -> List: get all the schedulers list.
  • get_schedulers_info([schedulers, filter_fields: List[str]]) -> List: get schedulers information, schedulers could be specified to limit to some ones. filter_fields is used to limit the query attributes.
  • count_workers() -> int: get size of workers.
  • get_workers() -> List: get all the workers list.
  • get_workers_info([workers, filter_fields: List[str]]) -> List: get workers information, workers could be specified to limit to some ones. filter_fields is used to limit the query attributes.

Session

  • create_session([session_id, if_not_exist: bool]): create session, create a new session id if session_id not specified. If if_not_exist is False, if session already created, throw an exception.
  • has_session(session_id) -> bool: if session with id session_id created already.
  • delete_session(session_id, [if_exist: bool]): delete session, if if_exist is False and session not exist, error will be raised.

Graph

  • submit_tileable_graph(session_id, tileable_graph, tileable_graph_key: str, target_tileable_keys: List[str], options: Dict): submit a tileable graph.
  • submit_chunk_graph(session_id, chunk_graph, chunk_graph_key: str, target_chunk_keys: List[str], options: Dict): submit a chunk graph.
  • get_tileable_graph_info(session_id, tileable_graph_key: str, filter_fields: List[str]: get information of tileable graph, if filter_fields specified, only filter the field in it.
  • get_chunk_graph_info(session_id, chunk_graph_key: str, filter_fields: List[str]: get information of chunk graph, if filter_fields specified, only filter the field in it.
  • wait_tileable_graph(session_id, tileable_graph_key: str, [timeout: float]: wait for completion of a graph until timeout reaches.
  • wait_chunk_graph(session_id, chunk_graph_key: str, [timeout: float]: wait for completion of a graph until timeout reaches.
  • stop_tileable_graph(session_id, tileable_graph_key: str): stop a tileable graph.`
  • stop_chunk_graph(session_id, chunk_graph_key: str): stop a chunk graph.
  • delete_tileable_graph(session_id, tileable_graph_key: str): delete a tileable graph.
  • delete_chunk_graph(session_id, chunk_graph_key: str): delete a chunk graph.

Meta

  • get_tileable_metas(session_id, tileable_keys, filter_fields: List[str]) -> List: get tileable metas. Tileable includes tensor, DataFrame, mutable tensor and mutable DataFrame.
  • get_chunk_metas(session_id, chunk_keys, filter_fields: List[str]) -> List: get chunk metas.

Mutable tensors and DataFrames

  • create_mutable_tensor(session_id, name: str, shape: Tuple(int), dtype: numpy.dtype, if_not_exist: bool, *args, **kwargs) : create a mutable tensor, if if_not_exist is False, will raise error if mutable tensor with name already created.
  • create_mutable_dataframe(session_id, name: str, shape: Tuple(int), dtypes: pandas.Series, if_not_exist: bool, *args, **kwargs): create a mutable DataFrame, if if_not_exist is False, will raise error if mutable DataFrame with name already created.
  • seal_mutable_tileable(session_id, name: str): turn a mutable tensor or DataFrame into a vanilla one.
  • delete_mutable_tileable(session_id, name: str): delete a mutable tensor or DataFrame.

Worker APIs

Read chunk data

  • get_chunks_data(session_id, worker str, chunk_keys: List[str]), indexes: List, compression_types: List[str] -> List: fetch chunks data from a specified worker. compression_types can specify the compression types which are acceptable for readers. indexes can be applied to do some indexing on the raw data, slice for instance.

Read or write mutable tensors and DataFrames

  • get_mutable_chunks_data(session_id, worker str, chunk_keys: List[str], indexes: List, timestamp: int, compression_types: List[str]) -> List: almost identical as get_chunks_data, difference is that as mutable tensor allows to write data with a timestamp, we allow to fetch mutable chunks data before some timestamp.
  • put_mutable_chunks_data(session_id, worker str, chunk_keys: List[str], chunks_data: List, timestamp: int), put mutable chunks data into the destination worker.

Context

Context would be slightly wrapping scheduler and worker APIs. But it must holds a env property which indicates the environment, this could be friendly if the context is used in some worker, commonly in the situation that the context is hold by some execution.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions