"""Task: a coarse-grained stage in an application."""
import inspect
import os
import re
import typing
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
import yaml
import sky
from sky import clouds
from sky import exceptions
from sky import global_user_state
from sky.backends import backend_utils
from sky.data import storage as storage_lib
from sky.data import data_utils
from sky.skylet import constants
from sky.utils import schemas
from sky.utils import ux_utils
if typing.TYPE_CHECKING:
from sky import resources as resources_lib
# A lambda generating commands (node rank_i, node addrs -> cmd_i).
CommandGen = Callable[[int, List[str]], Optional[str]]
CommandOrCommandGen = Union[str, CommandGen]
_VALID_NAME_REGEX = '[a-z0-9]+(?:[._-]{1,2}[a-z0-9]+)*'
_VALID_ENV_VAR_REGEX = '[a-zA-Z_][a-zA-Z0-9_]*'
_VALID_NAME_DESCR = ('ASCII characters and may contain lowercase and'
' uppercase letters, digits, underscores, periods,'
' and dashes. Must start and end with alphanumeric'
' characters. No triple dashes or underscores.')
_RUN_FN_CHECK_FAIL_MSG = (
'run command generator must take exactly 2 arguments: node_rank (int) and'
'a list of node ip addresses (List[str]). Got {run_sig}')
def _is_valid_name(name: str) -> bool:
"""Checks if the task name is valid.
Valid is defined as either NoneType or str with ASCII characters which may
contain lowercase and uppercase letters, digits, underscores, periods,
and dashes. Must start and end with alphanumeric characters.
No triple dashes or underscores.
Examples:
some_name_here
some-name-here
some__name__here
some--name--here
some__name--here
some.name.here
some-name_he.re
this---shouldnt--work
this___shouldnt_work
_thisshouldntwork
thisshouldntwork_
"""
if name is None:
return True
return bool(re.fullmatch(_VALID_NAME_REGEX, name))
def _is_valid_env_var(name: str) -> bool:
"""Checks if the task environment variable name is valid."""
return bool(re.fullmatch(_VALID_ENV_VAR_REGEX, name))
[docs]class Task:
"""Task: a computation to be run on the cloud."""
[docs] def __init__(
self,
name: Optional[str] = None,
*,
setup: Optional[str] = None,
run: Optional[CommandOrCommandGen] = None,
envs: Optional[Dict[str, str]] = None,
workdir: Optional[str] = None,
num_nodes: Optional[int] = None,
# Advanced:
docker_image: Optional[str] = None,
):
"""Initializes a Task.
All fields are optional. ``Task.run`` is the actual program: either a
shell command to run (str) or a command generator for different nodes
(lambda; see below).
Optionally, call ``Task.set_resources()`` to set the resource
requirements for this task. If not set, a default CPU-only requirement
is assumed (the same as ``sky cpunode``).
All setters of this class, ``Task.set_*()``, return ``self``, i.e.,
they are fluent APIs and can be chained together.
Example:
.. code-block:: python
# A Task that will sync up local workdir '.', containing
# requirements.txt and train.py.
sky.Task(setup='pip install requirements.txt',
run='python train.py',
workdir='.')
# An empty Task for provisioning a cluster.
task = sky.Task(num_nodes=n).set_resources(...)
# Chaining setters.
sky.Task().set_resources(...).set_file_mounts(...)
Args:
name: A string name for the Task for display purposes.
setup: A setup command, which will be run before executing the run
commands ``run``, and executed under ``workdir``.
run: The actual command for the task. If not None, either a shell
command (str) or a command generator (callable). If latter, it
must take a node rank and a list of node addresses as input and
return a shell command (str) (valid to return None for some nodes,
in which case no commands are run on them). Run commands will be
run under ``workdir``. Note the command generator should be a
self-contained lambda.
envs: A dictionary of environment variables to set before running the
setup and run commands.
workdir: The local working directory. This directory will be synced
to a location on the remote VM(s), and ``setup`` and ``run``
commands will be run under that location (thus, they can rely on
relative paths when invoking binaries).
num_nodes: The number of nodes to provision for this Task. If None,
treated as 1 node. If > 1, each node will execute its own
setup/run command, where ``run`` can either be a str, meaning all
nodes get the same command, or a lambda, with the semantics
documented above.
docker_image: (EXPERIMENTAL: Only in effect when LocalDockerBackend
is used.) The base docker image that this Task will be built on.
Defaults to 'gpuci/miniforge-cuda:11.4-devel-ubuntu18.04'.
"""
self.name = name
self.run = run
self.storage_mounts = {}
self.storage_plans = {}
self.setup = setup
self._envs = envs or dict()
self.workdir = workdir
self.docker_image = (docker_image if docker_image else
'gpuci/miniforge-cuda:11.4-devel-ubuntu18.04')
self.num_nodes = num_nodes
self.inputs = None
self.outputs = None
self.estimated_inputs_size_gigabytes = None
self.estimated_outputs_size_gigabytes = None
# Default to CPUNode
self.resources = {sky.Resources()}
self.time_estimator_func = None
self.file_mounts = None
# Only set when 'self' is a spot controller task: 'self.spot_task' is
# the underlying managed spot task (Task object).
self.spot_task = None
# Filled in by the optimizer. If None, this Task is not planned.
self.best_resources = None
# Check if the task is legal.
self._validate()
dag = sky.dag.get_current_dag()
if dag is not None:
dag.add(self)
def _validate(self):
"""Checks if the Task fields are valid."""
if not _is_valid_name(self.name):
with ux_utils.print_exception_no_traceback():
raise ValueError(f'Invalid task name {self.name}. Valid name: '
f'{_VALID_NAME_DESCR}')
# Check self.run
if callable(self.run):
run_sig = inspect.signature(self.run)
# Check that run is a function with 2 arguments.
if len(run_sig.parameters) != 2:
with ux_utils.print_exception_no_traceback():
raise ValueError(_RUN_FN_CHECK_FAIL_MSG.format(run_sig))
type_list = [int, List[str]]
# Check annotations, if exists
for i, param in enumerate(run_sig.parameters.values()):
if param.annotation != inspect.Parameter.empty:
if param.annotation != type_list[i]:
with ux_utils.print_exception_no_traceback():
raise ValueError(
_RUN_FN_CHECK_FAIL_MSG.format(run_sig))
# Check self containedness.
run_closure = inspect.getclosurevars(self.run)
if run_closure.nonlocals:
with ux_utils.print_exception_no_traceback():
raise ValueError(
'run command generator must be self contained. '
f'Found nonlocals: {run_closure.nonlocals}')
if run_closure.globals:
with ux_utils.print_exception_no_traceback():
raise ValueError(
'run command generator must be self contained. '
f'Found globals: {run_closure.globals}')
if run_closure.unbound:
# Do not raise an error here. Import statements, which are
# allowed, will be considered as unbounded.
pass
elif self.run is not None and not isinstance(self.run, str):
with ux_utils.print_exception_no_traceback():
raise ValueError('run must be either a shell script (str) or '
f'a command generator ({CommandGen}). '
f'Got {type(self.run)}')
# Workdir.
if self.workdir is not None:
full_workdir = os.path.abspath(os.path.expanduser(self.workdir))
if not os.path.isdir(full_workdir):
# Symlink to a dir is legal (isdir() follows symlinks).
with ux_utils.print_exception_no_traceback():
raise ValueError(
'Workdir must exist and must be a directory (or '
f'a symlink to a directory). {self.workdir} not found.')
[docs] @staticmethod
def from_yaml(yaml_path: str) -> 'Task':
"""Initializes a task from a task YAML.
Example:
.. code-block:: python
task = sky.Task.from_yaml('/path/to/task.yaml')
Args:
yaml_path: file path to a valid task yaml file.
Raises:
ValueError: if the path gets loaded into a str instead of a dict; or
if there are any other parsing errors.
"""
with open(os.path.expanduser(yaml_path), 'r') as f:
# TODO(zongheng): use
# https://github.com/yaml/pyyaml/issues/165#issuecomment-430074049
# to raise errors on duplicate keys.
config = yaml.safe_load(f)
if isinstance(config, str):
with ux_utils.print_exception_no_traceback():
raise ValueError('YAML loaded as str, not as dict. '
f'Is it correct? Path: {yaml_path}')
if config is None:
config = {}
backend_utils.validate_schema(config, schemas.get_task_schema(),
'Invalid task YAML: ')
task = Task(
config.pop('name', None),
run=config.pop('run', None),
workdir=config.pop('workdir', None),
setup=config.pop('setup', None),
num_nodes=config.pop('num_nodes', None),
envs=config.pop('envs', None),
)
# Create lists to store storage objects inlined in file_mounts.
# These are retained in dicts in the YAML schema and later parsed to
# storage objects with the storage/storage_mount objects.
fm_storages = []
file_mounts = config.pop('file_mounts', None)
if file_mounts is not None:
copy_mounts = dict()
for dst_path, src in file_mounts.items():
# Check if it is str path
if isinstance(src, str):
copy_mounts[dst_path] = src
# If the src is not a str path, it is likely a dict. Try to
# parse storage object.
elif isinstance(src, dict):
fm_storages.append((dst_path, src))
else:
with ux_utils.print_exception_no_traceback():
raise ValueError(f'Unable to parse file_mount '
f'{dst_path}:{src}')
task.set_file_mounts(copy_mounts)
task_storage_mounts = {} # type: Dict[str, Storage]
all_storages = fm_storages
for storage in all_storages:
mount_path = storage[0]
assert mount_path, \
'Storage mount path cannot be empty.'
try:
storage_obj = storage_lib.Storage.from_yaml_config(storage[1])
except exceptions.StorageSourceError as e:
# Patch the error message to include the mount path, if included
e.args = (e.args[0].replace('<destination_path>',
mount_path),) + e.args[1:]
raise e
task_storage_mounts[mount_path] = storage_obj
task.set_storage_mounts(task_storage_mounts)
if config.get('inputs') is not None:
inputs_dict = config.pop('inputs')
assert len(inputs_dict) == 1, 'Only one input is allowed.'
inputs = list(inputs_dict.keys())[0]
estimated_size_gigabytes = list(inputs_dict.values())[0]
# TODO: allow option to say (or detect) no download/egress cost.
task.set_inputs(inputs=inputs,
estimated_size_gigabytes=estimated_size_gigabytes)
if config.get('outputs') is not None:
outputs_dict = config.pop('outputs')
assert len(outputs_dict) == 1, 'Only one output is allowed.'
outputs = list(outputs_dict.keys())[0]
estimated_size_gigabytes = list(outputs_dict.values())[0]
task.set_outputs(outputs=outputs,
estimated_size_gigabytes=estimated_size_gigabytes)
resources = config.pop('resources', None)
resources = sky.Resources.from_yaml_config(resources)
task.set_resources({resources})
assert not config, f'Invalid task args: {config.keys()}'
return task
@property
def num_nodes(self) -> int:
return self._num_nodes
@property
def envs(self) -> Dict[str, str]:
return self._envs
[docs] def set_envs(
self, envs: Union[None, Tuple[Tuple[str, str]],
Dict[str, str]]) -> 'Task':
"""Sets the environment variables for use inside the setup/run commands.
Args:
envs: (optional) either a list of ``(env_name, value)`` or a dict
``{env_name: value}``.
Returns:
self: The current task, with envs set.
Raises:
ValueError: if various invalid inputs errors are detected.
"""
if envs is None:
self._envs = dict()
return self
if isinstance(envs, (list, tuple)):
keys = set(env[0] for env in envs)
if len(keys) != len(envs):
with ux_utils.print_exception_no_traceback():
raise ValueError('Duplicate env keys provided.')
envs = dict(envs)
if isinstance(envs, dict):
for key in envs:
if not isinstance(key, str):
with ux_utils.print_exception_no_traceback():
raise ValueError('Env keys must be strings.')
if not _is_valid_env_var(key):
with ux_utils.print_exception_no_traceback():
raise ValueError(f'Invalid env key: {key}')
else:
with ux_utils.print_exception_no_traceback():
raise ValueError(
'envs must be List[Tuple[str, str]] or Dict[str, str]: '
f'{envs}')
self._envs = envs
return self
@property
def need_spot_recovery(self) -> bool:
return any(r.spot_recovery is not None for r in self.resources)
@num_nodes.setter
def num_nodes(self, num_nodes: Optional[int]) -> None:
if num_nodes is None:
num_nodes = 1
if not isinstance(num_nodes, int) or num_nodes <= 0:
with ux_utils.print_exception_no_traceback():
raise ValueError(
f'num_nodes should be a positive int. Got: {num_nodes}')
self._num_nodes = num_nodes
def set_inputs(self, inputs, estimated_size_gigabytes) -> 'Task':
# E.g., 's3://bucket', 'gs://bucket', or None.
self.inputs = inputs
self.estimated_inputs_size_gigabytes = estimated_size_gigabytes
return self
def get_inputs(self):
return self.inputs
def get_estimated_inputs_size_gigabytes(self):
return self.estimated_inputs_size_gigabytes
def get_inputs_cloud(self):
"""EXPERIMENTAL: Returns the cloud my inputs live in."""
assert isinstance(self.inputs, str), self.inputs
if self.inputs.startswith('s3:'):
return clouds.AWS()
elif self.inputs.startswith('gs:'):
return clouds.GCP()
else:
with ux_utils.print_exception_no_traceback():
raise ValueError(f'cloud path not supported: {self.inputs}')
def set_outputs(self, outputs, estimated_size_gigabytes) -> 'Task':
self.outputs = outputs
self.estimated_outputs_size_gigabytes = estimated_size_gigabytes
return self
def get_outputs(self):
return self.outputs
def get_estimated_outputs_size_gigabytes(self):
return self.estimated_outputs_size_gigabytes
[docs] def set_resources(
self, resources: Union['resources_lib.Resources',
Set['resources_lib.Resources']]
) -> 'Task':
"""Sets the required resources to execute this task.
If this function is not called for a Task, default resource
requirements will be used (8 vCPUs).
Args:
resources: either a sky.Resources, or a set of them. The latter case
is EXPERIMENTAL and indicates asking the optimizer to "pick the
best of these resources" to run this task.
Returns:
self: The current task, with resources set.
"""
if isinstance(resources, sky.Resources):
resources = {resources}
self.resources = resources
return self
def get_resources(self):
return self.resources
def set_time_estimator(self, func: Callable[['sky.Resources'],
int]) -> 'Task':
"""Sets a func mapping resources to estimated time (secs).
This is EXPERIMENTAL.
"""
self.time_estimator_func = func
return self
def estimate_runtime(self, resources):
"""Returns a func mapping resources to estimated time (secs).
This is EXPERIMENTAL.
"""
if self.time_estimator_func is None:
raise NotImplementedError(
'Node [{}] does not have a cost model set; '
'call set_time_estimator() first'.format(self))
return self.time_estimator_func(resources)
[docs] def set_file_mounts(self, file_mounts: Optional[Dict[str, str]]) -> 'Task':
"""Sets the file mounts for this task.
Useful for syncing datasets, dotfiles, etc.
File mounts are a dictionary: ``{remote_path: local_path/cloud URI}``.
Local (or cloud) files/directories will be synced to the specified
paths on the remote VM(s) where this Task will run.
Neither source or destimation paths can end with a slash.
Example:
.. code-block:: python
task.set_file_mounts({
'~/.dotfile': '/local/.dotfile',
# /remote/dir/ will contain the contents of /local/dir/.
'/remote/dir': '/local/dir',
})
Args:
file_mounts: an optional dict of ``{remote_path: local_path/cloud
URI}``, where remote means the VM(s) on which this Task will
eventually run on, and local means the node from which the task is
launched.
Returns:
self: the current task, with file mounts set.
Raises:
ValueError: if input paths are invalid.
"""
if file_mounts is None:
self.file_mounts = None
return self
for target, source in file_mounts.items():
if target.endswith('/') or source.endswith('/'):
with ux_utils.print_exception_no_traceback():
raise ValueError(
'File mount paths cannot end with a slash '
'(try "/mydir: /mydir" or "/myfile: /myfile"). '
f'Found: target={target} source={source}')
if data_utils.is_cloud_store_url(target):
with ux_utils.print_exception_no_traceback():
raise ValueError(
'File mount destination paths cannot be cloud storage')
if not data_utils.is_cloud_store_url(source):
if not os.path.exists(
os.path.abspath(os.path.expanduser(source))):
with ux_utils.print_exception_no_traceback():
raise ValueError(
f'File mount source {source!r} does not exist '
'locally. To fix: check if it exists, and correct '
'the path.')
# TODO(zhwu): /home/username/sky_workdir as the target path need
# to be filtered out as well.
if (target == constants.SKY_REMOTE_WORKDIR and
self.workdir is not None):
with ux_utils.print_exception_no_traceback():
raise ValueError(
f'Cannot use {constants.SKY_REMOTE_WORKDIR!r} as a '
'destination path of a file mount, as it will be used '
'by the workdir. If uploading a file/folder to the '
'workdir is needed, please specify the full path to '
'the file/folder.')
self.file_mounts = file_mounts
return self
[docs] def update_file_mounts(self, file_mounts: Dict[str, str]) -> 'Task':
"""Updates the file mounts for this task.
Different from set_file_mounts(), this function updates into the
existing file_mounts (calls ``dict.update()``), rather than
overwritting it.
This should be called before provisioning in order to take effect.
Example:
.. code-block:: python
task.update_file_mounts({
'~/.config': '~/Documents/config',
'/tmp/workdir': '/local/workdir/cnn-cifar10',
})
Args:
file_mounts: a dict of ``{remote_path: local_path/cloud URI}``, where
remote means the VM(s) on which this Task will eventually run on,
and local means the node from which the task is launched.
Returns:
self: the current task, with file mounts updated.
Raises:
ValueError: if input paths are invalid.
"""
if self.file_mounts is None:
self.file_mounts = {}
self.file_mounts.update(file_mounts)
# For validation logic:
return self.set_file_mounts(self.file_mounts)
[docs] def set_storage_mounts(
self,
storage_mounts: Optional[Dict[str, storage_lib.Storage]],
) -> 'Task':
"""Sets the storage mounts for this task.
Storage mounts are a dictionary: ``{mount_path: sky.Storage object}``,
each of which mounts a sky.Storage object (a cloud object store bucket)
to a path inside the remote cluster.
A sky.Storage object can be created by uploading from a local directory
(setting ``source``), or backed by an existing cloud bucket (setting
``name`` to the bucket name; or setting ``source`` to the bucket URI).
Example:
.. code-block:: python
task.set_storage_mounts({
'/remote/imagenet/': sky.Storage(name='my-bucket',
source='/local/imagenet'),
})
Args:
storage_mounts: an optional dict of ``{mount_path: sky.Storage
object}``, where mount_path is the path inside the remote VM(s)
where the Storage object will be mounted on.
Returns:
self: The current task, with storage mounts set.
Raises:
ValueError: if input paths are invalid.
"""
if storage_mounts is None:
self.storage_mounts = None
return self
for target, _ in storage_mounts.items():
# TODO(zhwu): /home/username/sky_workdir as the target path need
# to be filtered out as well.
if (target == constants.SKY_REMOTE_WORKDIR and
self.workdir is not None):
with ux_utils.print_exception_no_traceback():
raise ValueError(
f'Cannot use {constants.SKY_REMOTE_WORKDIR!r} as a '
'destination path of a file mount, as it will be used '
'by the workdir. If uploading a file/folder to the '
'workdir is needed, please specify the full path to '
'the file/folder.')
if data_utils.is_cloud_store_url(target):
with ux_utils.print_exception_no_traceback():
raise ValueError(
'Storage mount destination path cannot be cloud storage'
)
# Storage source validation is done in Storage object
self.storage_mounts = storage_mounts
return self
[docs] def update_storage_mounts(
self, storage_mounts: Dict[str, storage_lib.Storage]) -> 'Task':
"""Updates the storage mounts for this task.
Different from set_storage_mounts(), this function updates into the
existing storage_mounts (calls ``dict.update()``), rather than
overwritting it.
This should be called before provisioning in order to take effect.
Args:
storage_mounts: an optional dict of ``{mount_path: sky.Storage
object}``, where mount_path is the path inside the remote VM(s)
where the Storage object will be mounted on.
Returns:
self: The current task, with storage mounts updated.
Raises:
ValueError: if input paths are invalid.
"""
if not storage_mounts:
return self
task_storage_mounts = self.storage_mounts if self.storage_mounts else {}
task_storage_mounts.update(storage_mounts)
return self.set_storage_mounts(task_storage_mounts)
def get_preferred_store_type(self) -> storage_lib.StoreType:
# TODO(zhwu, romilb): The optimizer should look at the source and
# destination to figure out the right stores to use. For now, we
# use a heuristic solution to find the store type by the following
# order:
# 1. cloud decided in best_resources.
# 2. cloud specified in the task resources.
# 3. the first enabled cloud.
# This should be refactored and moved to the optimizer.
assert len(self.resources) == 1, self.resources
storage_cloud = None
if self.best_resources is not None:
storage_cloud = self.best_resources.cloud
else:
resources = list(self.resources)[0]
storage_cloud = resources.cloud
if storage_cloud is None:
# Get the first enabled cloud.
backend_utils.check_public_cloud_enabled()
enabled_clouds = global_user_state.get_enabled_clouds()
for cloud in storage_lib.STORE_ENABLED_CLOUDS:
for enabled_cloud in enabled_clouds:
if cloud.is_same_cloud(enabled_cloud):
storage_cloud = cloud
break
if storage_cloud is None:
raise ValueError('No available cloud to mount storage.')
store_type = storage_lib.get_storetype_from_cloud(storage_cloud)
return store_type
def sync_storage_mounts(self) -> None:
"""(INTERNAL) Eagerly syncs storage mounts to cloud storage.
After syncing up, COPY-mode storage mounts are translated into regular
file_mounts of the form ``{ /remote/path: {s3,gs,..}://<bucket path>
}``.
"""
for storage in self.storage_mounts.values():
if len(storage.stores) == 0:
store_type = self.get_preferred_store_type()
self.storage_plans[storage] = store_type
storage.add_store(store_type)
else:
# We will download the first store that is added to remote.
self.storage_plans[storage] = list(storage.stores.keys())[0]
storage_mounts = self.storage_mounts
storage_plans = self.storage_plans
for mnt_path, storage in storage_mounts.items():
if storage.mode == storage_lib.StorageMode.COPY:
store_type = storage_plans[storage]
if store_type is storage_lib.StoreType.S3:
# TODO: allow for Storage mounting of different clouds
if storage.source is not None and storage.source.startswith(
's3://'):
blob_path = storage.source
else:
blob_path = 's3://' + storage.name
self.update_file_mounts({
mnt_path: blob_path,
})
elif store_type is storage_lib.StoreType.GCS:
if storage.source.startswith('gs://'):
blob_path = storage.source
else:
blob_path = 'gs://' + storage.name
self.update_file_mounts({
mnt_path: blob_path,
})
elif store_type is storage_lib.StoreType.AZURE:
# TODO when Azure Blob is done: sync ~/.azure
assert False, 'TODO: Azure Blob not mountable yet'
else:
with ux_utils.print_exception_no_traceback():
raise ValueError(f'Storage Type {store_type} '
'does not exist!')
def get_local_to_remote_file_mounts(self) -> Optional[Dict[str, str]]:
"""Returns file mounts of the form (dst=VM path, src=local path).
Any cloud object store URIs (gs://, s3://, etc.), either as source or
destination, are not included.
INTERNAL: this method is internal-facing.
"""
if self.file_mounts is None:
return None
d = {}
for k, v in self.file_mounts.items():
if not data_utils.is_cloud_store_url(
k) and not data_utils.is_cloud_store_url(v):
d[k] = v
return d
def get_cloud_to_remote_file_mounts(self) -> Optional[Dict[str, str]]:
"""Returns file mounts of the form (dst=VM path, src=cloud URL).
Local-to-remote file mounts are excluded (handled by
get_local_to_remote_file_mounts()).
INTERNAL: this method is internal-facing.
"""
if self.file_mounts is None:
return None
d = {}
for k, v in self.file_mounts.items():
if not data_utils.is_cloud_store_url(
k) and data_utils.is_cloud_store_url(v):
d[k] = v
return d
def to_yaml_config(self) -> Dict[str, Any]:
"""Returns a yaml-style dict representation of the task.
INTERNAL: this method is internal-facing.
"""
config = dict()
def add_if_not_none(key, value, no_empty: bool = False):
if no_empty and not value:
return
if value is not None:
config[key] = value
add_if_not_none('name', self.name)
if self.resources is not None:
assert len(self.resources) == 1
resources = list(self.resources)[0]
add_if_not_none('resources', resources.to_yaml_config())
add_if_not_none('num_nodes', self.num_nodes)
if self.inputs is not None:
add_if_not_none('inputs',
{self.inputs: self.estimated_inputs_size_gigabytes})
if self.outputs is not None:
add_if_not_none(
'outputs',
{self.outputs: self.estimated_outputs_size_gigabytes})
add_if_not_none('setup', self.setup)
add_if_not_none('workdir', self.workdir)
add_if_not_none('run', self.run)
add_if_not_none('envs', self.envs, no_empty=True)
add_if_not_none('file_mounts', dict())
if self.file_mounts is not None:
config['file_mounts'].update(self.file_mounts)
if self.storage_mounts is not None:
config['file_mounts'].update({
mount_path: storage.to_yaml_config()
for mount_path, storage in self.storage_mounts.items()
})
return config
def __rshift__(self, b):
sky.dag.get_current_dag().add_edge(self, b)
def __repr__(self):
if self.name:
return self.name
if isinstance(self.run, str):
run_msg = self.run.replace('\n', '\\n')
if len(run_msg) > 20:
run_msg = f'run=\'{run_msg[:20]}...\''
else:
run_msg = f'run=\'{run_msg}\''
elif self.run is None:
run_msg = 'run=None'
else:
run_msg = 'run=<fn>'
s = f'Task({run_msg})'
if self.inputs is not None:
s += f'\n inputs: {self.inputs}'
if self.outputs is not None:
s += f'\n outputs: {self.outputs}'
if self.num_nodes > 1:
s += f'\n nodes: {self.num_nodes}'
if len(self.resources) > 1 or not list(self.resources)[0].is_empty():
s += f'\n resources: {self.resources}'
else:
s += '\n resources: default instances'
return s