[Notes] [Git][BuildGrid/buildgrid][finn/74-operation-cancelation] 18 commits: Add instances to service-level operation names



Title: GitLab

finn pushed to branch finn/74-operation-cancelation at BuildGrid / buildgrid

Commits:

24 changed files:

Changes:

  • buildgrid/_app/commands/cmd_bot.py
    ... ... @@ -28,8 +28,10 @@ from urllib.parse import urlparse
    28 28
     import click
    
    29 29
     import grpc
    
    30 30
     
    
    31
    -from buildgrid.bot import bot, bot_interface
    
    32
    -from buildgrid.bot.bot_session import BotSession, Device, Worker
    
    31
    +from buildgrid.bot import bot, interface, session, hardware
    
    32
    +from buildgrid.bot.hardware.device import Device
    
    33
    +from buildgrid.bot.hardware.worker import Worker
    
    34
    +
    
    33 35
     
    
    34 36
     from ..bots import buildbox, dummy, host
    
    35 37
     from ..cli import pass_context
    
    ... ... @@ -123,12 +125,12 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_
    123 125
         context.logger = logging.getLogger(__name__)
    
    124 126
         context.logger.debug("Starting for remote {}".format(context.remote))
    
    125 127
     
    
    126
    -    interface = bot_interface.BotInterface(context.channel)
    
    128
    +    interface = interface.BotInterface(context.channel)
    
    127 129
     
    
    128 130
         worker = Worker()
    
    129 131
         worker.add_device(Device())
    
    130 132
     
    
    131
    -    bot_session = BotSession(parent, interface)
    
    133
    +    bot_session = session.BotSession(parent, interface)
    
    132 134
         bot_session.add_worker(worker)
    
    133 135
     
    
    134 136
         context.bot_session = bot_session
    

  • buildgrid/_app/commands/cmd_operation.py
    ... ... @@ -155,6 +155,19 @@ def status(context, operation_name, json):
    155 155
             click.echo(json_format.MessageToJson(operation))
    
    156 156
     
    
    157 157
     
    
    158
    +@cli.command('cancel', short_help="Cancel an operation.")
    
    159
    +@click.argument('operation-name', nargs=1, type=click.STRING, required=True)
    
    160
    +@pass_context
    
    161
    +def status(context, operation_name):
    
    162
    +    context.logger.info("Cancelling an operation...")
    
    163
    +    stub = operations_pb2_grpc.OperationsStub(context.channel)
    
    164
    +
    
    165
    +    request = operations_pb2.CancelOperationRequest(name=operation_name)
    
    166
    +
    
    167
    +    response = stub.CancelOperation(request)
    
    168
    +    context.logger.info("Operation cancelled")
    
    169
    +
    
    170
    +
    
    158 171
     @cli.command('list', short_help="List operations.")
    
    159 172
     @click.option('--json', is_flag=True, show_default=True,
    
    160 173
                   help="Print operations list in JSON format.")
    

  • buildgrid/_exceptions.py
    ... ... @@ -52,6 +52,12 @@ class BotError(BgdError):
    52 52
             super().__init__(message, detail=detail, domain=ErrorDomain.BOT, reason=reason)
    
    53 53
     
    
    54 54
     
    
    55
    +class CancelledError(BgdError):
    
    56
    +    """The job was cancelled and any callers should be notified"""
    
    57
    +    def __init__(self, message, detail=None, reason=None):
    
    58
    +        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    59
    +
    
    60
    +
    
    55 61
     class InvalidArgumentError(BgdError):
    
    56 62
         """A bad argument was passed, such as a name which doesn't exist."""
    
    57 63
         def __init__(self, message, detail=None, reason=None):
    

  • buildgrid/bot/bot.py
    ... ... @@ -17,7 +17,7 @@
    17 17
     Bot
    
    18 18
     ====
    
    19 19
     
    
    20
    -Creates a bot session.
    
    20
    +Creates a bot session and sends updates to the server.
    
    21 21
     """
    
    22 22
     
    
    23 23
     import asyncio
    
    ... ... @@ -45,6 +45,7 @@ class Bot:
    45 45
                 loop.run_forever()
    
    46 46
             except KeyboardInterrupt:
    
    47 47
                 pass
    
    48
    +
    
    48 49
             finally:
    
    49 50
                 task.cancel()
    
    50 51
                 loop.close()
    

  • buildgrid/bot/bot_session.py deleted
    1
    -# Copyright (C) 2018 Bloomberg LP
    
    2
    -#
    
    3
    -# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    -# you may not use this file except in compliance with the License.
    
    5
    -# You may obtain a copy of the License at
    
    6
    -#
    
    7
    -#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    -#
    
    9
    -# Unless required by applicable law or agreed to in writing, software
    
    10
    -# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    -# See the License for the specific language governing permissions and
    
    13
    -# limitations under the License.
    
    14
    -
    
    15
    -# Disable broad exception catch
    
    16
    -# pylint: disable=broad-except
    
    17
    -
    
    18
    -
    
    19
    -"""
    
    20
    -Bot Session
    
    21
    -====
    
    22
    -
    
    23
    -Allows connections
    
    24
    -"""
    
    25
    -import asyncio
    
    26
    -import logging
    
    27
    -import platform
    
    28
    -import uuid
    
    29
    -
    
    30
    -import grpc
    
    31
    -
    
    32
    -from buildgrid._enums import BotStatus, LeaseState
    
    33
    -from buildgrid._protos.google.rpc import code_pb2
    
    34
    -from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, worker_pb2
    
    35
    -from buildgrid._exceptions import BotError
    
    36
    -
    
    37
    -
    
    38
    -class BotSession:
    
    39
    -    def __init__(self, parent, interface):
    
    40
    -        """ Unique bot ID within the farm used to identify this bot
    
    41
    -        Needs to be human readable.
    
    42
    -        All prior sessions with bot_id of same ID are invalidated.
    
    43
    -        If a bot attempts to update an invalid session, it must be rejected and
    
    44
    -        may be put in quarantine.
    
    45
    -        """
    
    46
    -
    
    47
    -        self.logger = logging.getLogger(__name__)
    
    48
    -
    
    49
    -        self._bot_id = '{}.{}'.format(parent, platform.node())
    
    50
    -        self._context = None
    
    51
    -        self._interface = interface
    
    52
    -        self._leases = {}
    
    53
    -        self._name = None
    
    54
    -        self._parent = parent
    
    55
    -        self._status = BotStatus.OK.value
    
    56
    -        self._work = None
    
    57
    -        self._worker = None
    
    58
    -
    
    59
    -    @property
    
    60
    -    def bot_id(self):
    
    61
    -        return self._bot_id
    
    62
    -
    
    63
    -    def add_worker(self, worker):
    
    64
    -        self._worker = worker
    
    65
    -
    
    66
    -    def create_bot_session(self, work, context=None):
    
    67
    -        self.logger.debug("Creating bot session")
    
    68
    -        self._work = work
    
    69
    -        self._context = context
    
    70
    -
    
    71
    -        session = self._interface.create_bot_session(self._parent, self.get_pb2())
    
    72
    -        self._name = session.name
    
    73
    -
    
    74
    -        self.logger.info("Created bot session with name: [{}]".format(self._name))
    
    75
    -
    
    76
    -        for lease in session.leases:
    
    77
    -            self._update_lease_from_server(lease)
    
    78
    -
    
    79
    -    def update_bot_session(self):
    
    80
    -        self.logger.debug("Updating bot session: [{}]".format(self._bot_id))
    
    81
    -        session = self._interface.update_bot_session(self.get_pb2())
    
    82
    -        for k, v in list(self._leases.items()):
    
    83
    -            if v.state == LeaseState.COMPLETED.value:
    
    84
    -                del self._leases[k]
    
    85
    -
    
    86
    -        for lease in session.leases:
    
    87
    -            self._update_lease_from_server(lease)
    
    88
    -
    
    89
    -    def get_pb2(self):
    
    90
    -        leases = list(self._leases.values())
    
    91
    -        if not leases:
    
    92
    -            leases = None
    
    93
    -
    
    94
    -        return bots_pb2.BotSession(worker=self._worker.get_pb2(),
    
    95
    -                                   status=self._status,
    
    96
    -                                   leases=leases,
    
    97
    -                                   bot_id=self._bot_id,
    
    98
    -                                   name=self._name)
    
    99
    -
    
    100
    -    def lease_completed(self, lease):
    
    101
    -        lease.state = LeaseState.COMPLETED.value
    
    102
    -        self._leases[lease.id] = lease
    
    103
    -
    
    104
    -    def _update_lease_from_server(self, lease):
    
    105
    -        """
    
    106
    -        State machine for any recieved updates to the leases.
    
    107
    -        """
    
    108
    -        # TODO: Compare with previous state of lease
    
    109
    -        if lease.state == LeaseState.PENDING.value:
    
    110
    -            lease.state = LeaseState.ACTIVE.value
    
    111
    -            self._leases[lease.id] = lease
    
    112
    -            self.update_bot_session()
    
    113
    -            asyncio.ensure_future(self.create_work(lease))
    
    114
    -
    
    115
    -    async def create_work(self, lease):
    
    116
    -        self.logger.debug("Work created: [{}]".format(lease.id))
    
    117
    -        loop = asyncio.get_event_loop()
    
    118
    -
    
    119
    -        try:
    
    120
    -            lease = await loop.run_in_executor(None, self._work, self._context, lease)
    
    121
    -
    
    122
    -        except grpc.RpcError as e:
    
    123
    -            self.logger.error("RPC error thrown: [{}]".format(e))
    
    124
    -            lease.status.CopyFrom(e.code())
    
    125
    -
    
    126
    -        except BotError as e:
    
    127
    -            self.logger.error("Internal bot error thrown: [{}]".format(e))
    
    128
    -            lease.status.code = code_pb2.INTERNAL
    
    129
    -
    
    130
    -        except Exception as e:
    
    131
    -            self.logger.error("Exception thrown: [{}]".format(e))
    
    132
    -            lease.status.code = code_pb2.INTERNAL
    
    133
    -
    
    134
    -        self.logger.debug("Work complete: [{}]".format(lease.id))
    
    135
    -        self.lease_completed(lease)
    
    136
    -
    
    137
    -
    
    138
    -class Worker:
    
    139
    -    def __init__(self, properties=None, configs=None):
    
    140
    -        self.properties = {}
    
    141
    -        self._configs = {}
    
    142
    -        self._devices = []
    
    143
    -
    
    144
    -        if properties:
    
    145
    -            for k, v in properties.items():
    
    146
    -                if k == 'pool':
    
    147
    -                    self.properties[k] = v
    
    148
    -                else:
    
    149
    -                    raise KeyError('Key not supported: [{}]'.format(k))
    
    150
    -
    
    151
    -        if configs:
    
    152
    -            for k, v in configs.items():
    
    153
    -                if k == 'DockerImage':
    
    154
    -                    self.configs[k] = v
    
    155
    -                else:
    
    156
    -                    raise KeyError('Key not supported: [{}]'.format(k))
    
    157
    -
    
    158
    -    @property
    
    159
    -    def configs(self):
    
    160
    -        return self._configs
    
    161
    -
    
    162
    -    def add_device(self, device):
    
    163
    -        self._devices.append(device)
    
    164
    -
    
    165
    -    def get_pb2(self):
    
    166
    -        devices = [device.get_pb2() for device in self._devices]
    
    167
    -        worker = worker_pb2.Worker(devices=devices)
    
    168
    -        property_message = worker_pb2.Worker.Property()
    
    169
    -        for k, v in self.properties.items():
    
    170
    -            property_message.key = k
    
    171
    -            property_message.value = v
    
    172
    -            worker.properties.extend([property_message])
    
    173
    -
    
    174
    -        config_message = worker_pb2.Worker.Config()
    
    175
    -        for k, v in self.properties.items():
    
    176
    -            property_message.key = k
    
    177
    -            property_message.value = v
    
    178
    -            worker.configs.extend([config_message])
    
    179
    -
    
    180
    -        return worker
    
    181
    -
    
    182
    -
    
    183
    -class Device:
    
    184
    -    def __init__(self, properties=None):
    
    185
    -        """ Creates devices available to the worker
    
    186
    -        The first device is know as the Primary Device - the revice which
    
    187
    -        is running a bit and responsible to actually executing commands.
    
    188
    -        All other devices are known as Attatched Devices and must be controlled
    
    189
    -        by the Primary Device.
    
    190
    -        """
    
    191
    -
    
    192
    -        self._name = str(uuid.uuid4())
    
    193
    -        self._properties = {}
    
    194
    -
    
    195
    -        if properties:
    
    196
    -            for k, v in properties.items():
    
    197
    -                if k == 'os':
    
    198
    -                    self._properties[k] = v
    
    199
    -
    
    200
    -                elif k == 'docker':
    
    201
    -                    if v not in ('True', 'False'):
    
    202
    -                        raise ValueError('Value not supported: [{}]'.format(v))
    
    203
    -                    self._properties[k] = v
    
    204
    -
    
    205
    -                else:
    
    206
    -                    raise KeyError('Key not supported: [{}]'.format(k))
    
    207
    -
    
    208
    -    @property
    
    209
    -    def name(self):
    
    210
    -        return self._name
    
    211
    -
    
    212
    -    @property
    
    213
    -    def properties(self):
    
    214
    -        return self._properties
    
    215
    -
    
    216
    -    def get_pb2(self):
    
    217
    -        device = worker_pb2.Device(handle=self._name)
    
    218
    -        property_message = worker_pb2.Device.Property()
    
    219
    -        for k, v in self._properties.items():
    
    220
    -            property_message.key = k
    
    221
    -            property_message.value = v
    
    222
    -            device.properties.extend([property_message])
    
    223
    -        return device

  • buildgrid/bot/hardware/__init__.py

  • buildgrid/bot/hardware/device.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +"""
    
    17
    +Device
    
    18
    +======
    
    19
    +
    
    20
    +A device.
    
    21
    +"""
    
    22
    +
    
    23
    +from buildgrid._protos.google.devtools.remoteworkers.v1test2 import worker_pb2
    
    24
    +
    
    25
    +class Device:
    
    26
    +
    
    27
    +    def __init__(self, properties=None):
    
    28
    +        """ Creates devices available to the worker
    
    29
    +        The first device is know as the Primary Device - the revice which
    
    30
    +        is running a bit and responsible to actually executing commands.
    
    31
    +        All other devices are known as Attatched Devices and must be controlled
    
    32
    +        by the Primary Device.
    
    33
    +
    
    34
    +        properties (list(dict(string : string))) : Properties of device. Keys may
    
    35
    +        repeated.
    
    36
    +        """
    
    37
    +
    
    38
    +        self._properties = {}
    
    39
    +        self.__property_keys = ['os', 'has-docker']
    
    40
    +        self.__name = str(uuid.uuid4())
    
    41
    +
    
    42
    +        for prop in properties:
    
    43
    +            self._add_property(prop)
    
    44
    +
    
    45
    +    @property
    
    46
    +    def name(self):
    
    47
    +        return self.__name
    
    48
    +
    
    49
    +    @property
    
    50
    +    def properties(self):
    
    51
    +        return self._properties
    
    52
    +
    
    53
    +    def get_pb2(self):
    
    54
    +        device = worker_pb2.Device(handle=self._name)
    
    55
    +        for k, v in self._properties.items():
    
    56
    +            for prop in v:
    
    57
    +                property_message = worker_pb2.Device.Property()
    
    58
    +                property_message.key = k
    
    59
    +                property_message.value = prop
    
    60
    +                device.properties.extend([property_message])
    
    61
    +        return device
    
    62
    +
    
    63
    +    def _add_property(self, key, value):
    
    64
    +        if key in self.__property_keys:
    
    65
    +            prop = self._properties.get(key)
    
    66
    +            if not prop:
    
    67
    +                self._properties[key] = [value]
    
    68
    +            else:
    
    69
    +                prop[key].append(value)
    
    70
    +
    
    71
    +        else:
    
    72
    +            raise KeyError('Key not supported: [{}]'.format(key))

  • buildgrid/bot/hardware/interface.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +"""
    
    17
    +HardwareInterface
    
    18
    +=================
    
    19
    +
    
    20
    +Class to configure hardware and check requirements of leases.
    
    21
    +
    
    22
    +In the future this could also be used to request and display
    
    23
    +the status of hardware.
    
    24
    +"""
    
    25
    +
    
    26
    +
    
    27
    +from buildgrid._exceptions import FailedPreconditionError
    
    28
    +
    
    29
    +
    
    30
    +class HardwareInterface:
    
    31
    +
    
    32
    +    def __init__(self, worker):
    
    33
    +        self._worker = worker
    
    34
    +
    
    35
    +    def configure_hardware(self, lease):
    
    36
    +        """ Can check if the requirements can be met and also
    
    37
    +        in the future, potentially configure the hardware.
    
    38
    +        """
    
    39
    +        worker = self._worker
    
    40
    +        worker_requirements = lease.worker
    
    41
    +
    
    42
    +        for config_requirement in worker_requirements.configs:
    
    43
    +            if config_requirement.key not in worker.configs:
    
    44
    +                raise FailedPreconditionError("Config not supported: [{}]".format(config_requirement))
    
    45
    +
    
    46
    +    def get_worker_pb2(self):
    
    47
    +        return self._worker.get_pb2()

  • buildgrid/bot/hardware/worker.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +from buildgrid._protos.google.devtools.remoteworkers.v1test2 import worker_pb2
    
    17
    +
    
    18
    +
    
    19
    +class Worker:
    
    20
    +
    
    21
    +    def __init__(self, properties=None, configs=None):
    
    22
    +        self._devices = []
    
    23
    +        self._configs = {}
    
    24
    +        self._properties = {}
    
    25
    +        self.__property_keys = ['pool']
    
    26
    +        self.__config_keys = ['DockerImage']
    
    27
    +
    
    28
    +        if properties:
    
    29
    +            for k, v in properties.items():
    
    30
    +                if k in self.__property_keys:
    
    31
    +                    self._add_properties(k, v)
    
    32
    +
    
    33
    +        if configs:
    
    34
    +            for k, v in configs.items():
    
    35
    +                self._add_config(k, v)
    
    36
    +
    
    37
    +    @property
    
    38
    +    def configs(self):
    
    39
    +        return self._configs
    
    40
    +
    
    41
    +    @property
    
    42
    +    def properties(self):
    
    43
    +        return self._properties
    
    44
    +
    
    45
    +    def add_device(self, device):
    
    46
    +        self._devices.append(device)
    
    47
    +
    
    48
    +    def get_pb2(self):
    
    49
    +        devices = [device.get_pb2() for device in self._devices]
    
    50
    +        worker = worker_pb2.Worker(devices=devices)
    
    51
    +
    
    52
    +        for k, v in self._properties.items():
    
    53
    +            for prop in v:
    
    54
    +                property_message = worker_pb2.Device.Property()
    
    55
    +                property_message.key = k
    
    56
    +                property_message.value = prop
    
    57
    +                device.properties.extend([property_message])
    
    58
    +
    
    59
    +        for k, v in self._configs.items():
    
    60
    +            for cfg in v:
    
    61
    +                config_message = worker_pb2.Worker.Config()
    
    62
    +                config.key = k
    
    63
    +                config_message.value = cfg
    
    64
    +                worker.configs.extend([config_message])
    
    65
    +
    
    66
    +        return worker
    
    67
    +
    
    68
    +    def _add_config(self, key, value):
    
    69
    +        if key in self.__config_keys:
    
    70
    +            cfg = self._configs.get(key)
    
    71
    +            if not cfg:
    
    72
    +                self._configs[key] = [value]
    
    73
    +            else:
    
    74
    +                cfg[key].append(value)
    
    75
    +
    
    76
    +        else:
    
    77
    +            raise KeyError('Key not supported: [{}]'.format(key))
    
    78
    +
    
    79
    +    def _add_property(self, key, value):
    
    80
    +        if key in self.__property_keys:
    
    81
    +            prop = self._properties.get(key)
    
    82
    +            if not prop:
    
    83
    +                self._properties[key] = [value]
    
    84
    +            else:
    
    85
    +                prop[key].append(value)
    
    86
    +
    
    87
    +        else:
    
    88
    +            raise KeyError('Key not supported: [{}]'.format(key))

  • buildgrid/bot/bot_interface.pybuildgrid/bot/interface.py
    ... ... @@ -15,7 +15,7 @@
    15 15
     
    
    16 16
     """
    
    17 17
     Bot Interface
    
    18
    -====
    
    18
    +=============
    
    19 19
     
    
    20 20
     Interface to grpc
    
    21 21
     """
    

  • buildgrid/bot/session.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +# Disable broad exception catch
    
    16
    +# pylint: disable=broad-except
    
    17
    +
    
    18
    +
    
    19
    +"""
    
    20
    +Bot Session
    
    21
    +===========
    
    22
    +
    
    23
    +Allows connections
    
    24
    +"""
    
    25
    +import asyncio
    
    26
    +import logging
    
    27
    +import platform
    
    28
    +import uuid
    
    29
    +
    
    30
    +import grpc
    
    31
    +
    
    32
    +from buildgrid._enums import BotStatus, LeaseState
    
    33
    +from buildgrid._protos.google.rpc import code_pb2
    
    34
    +from buildgrid._exceptions import BotError
    
    35
    +
    
    36
    +from buildgrid._exceptions import FailedPreconditionError
    
    37
    +
    
    38
    +class BotSession:
    
    39
    +    def __init__(self, parent, bots_interface, hardware_interface):
    
    40
    +        """ Unique bot ID within the farm used to identify this bot
    
    41
    +        Needs to be human readable.
    
    42
    +        All prior sessions with bot_id of same ID are invalidated.
    
    43
    +        If a bot attempts to update an invalid session, it must be rejected and
    
    44
    +        may be put in quarantine.
    
    45
    +        """
    
    46
    +
    
    47
    +        self.logger = logging.getLogger(__name__)
    
    48
    +
    
    49
    +        self._bots_interface = bots_interface
    
    50
    +        self._hardware_interface_interface = hardware_interface
    
    51
    +
    
    52
    +        self._status = BotStatus.OK.value
    
    53
    +        self._tenant_manager = TenantManager()
    
    54
    +
    
    55
    +        self.__parent = parent
    
    56
    +        self.__bot_id = '{}.{}'.format(parent, platform.node())
    
    57
    +        self.__name = None
    
    58
    +
    
    59
    +    @property
    
    60
    +    def bot_id(self):
    
    61
    +        return self.__bot_id
    
    62
    +
    
    63
    +    def create_bot_session(self):
    
    64
    +        self.logger.debug("Creating bot session")
    
    65
    +
    
    66
    +        session = self._interface.create_bot_session(self._parent, self.get_pb2())
    
    67
    +        self.__name = session.name
    
    68
    +
    
    69
    +        self.logger.info("Created bot session with name: [{}]".format(self.__name))
    
    70
    +
    
    71
    +        for lease in session.leases:
    
    72
    +            self._register_lease(lease)
    
    73
    +
    
    74
    +    def update_bot_session(self):
    
    75
    +        self.logger.debug("Updating bot session: [{}]".format(self.__bot_id))
    
    76
    +
    
    77
    +        session = self._interface.update_bot_session(self.get_pb2())
    
    78
    +        server_ids = []
    
    79
    +
    
    80
    +        for lease in session.leases:
    
    81
    +            server_ids.append(lease.id)
    
    82
    +
    
    83
    +            if lease.status.PENDING:
    
    84
    +                self._register_lease(lease)
    
    85
    +
    
    86
    +            elif lease.status.CANCELLED:
    
    87
    +                self._tenant_manager.cancel_tenancy(lease_id)
    
    88
    +
    
    89
    +        closed_lease_ids = [x for x in self._tenant_manager.get_lease_ids() if x not in server_ids]
    
    90
    +
    
    91
    +        for lease_id in closed_lease_ids:
    
    92
    +            self._tenant_manager.remove_tenant(lease_id)
    
    93
    +
    
    94
    +    def get_pb2(self):
    
    95
    +        return bots_pb2.BotSession(worker=self._worker.get_pb2(),
    
    96
    +                                   status=self._status,
    
    97
    +                                   leases=self._tenant_manager.get_leases(),
    
    98
    +                                   bot_id=self.__bot_id,
    
    99
    +                                   name=self.__name)
    
    100
    +
    
    101
    +    def _register_lease(self, lease):
    
    102
    +        lease_id = lease.id
    
    103
    +        try:
    
    104
    +            self._tenant_manager.create_tenancy(lease)
    
    105
    +
    
    106
    +        except KeyError as e:
    
    107
    +            self.logger.error("Cannot register lease: [{}]. Error: {}".format(lease.id, e))
    
    108
    +
    
    109
    +        else:
    
    110
    +            try:
    
    111
    +                self._hardware_interface.configure_hardware(lease)
    
    112
    +
    
    113
    +            except FailedPreconditionError as e:
    
    114
    +                self._tenant_manager.complete_lease(lease_id, status=code_pb2.FailedPreconditionError)
    
    115
    +
    
    116
    +            else:
    
    117
    +                self._tenant_manager.create_work(lease_id)

  • buildgrid/bot/tenant.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +"""
    
    16
    +Tenant
    
    17
    +======
    
    18
    +
    
    19
    +Handles leased and runs leased work.
    
    20
    +"""
    
    21
    +
    
    22
    +
    
    23
    +from functools import partial
    
    24
    +
    
    25
    +from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    26
    +
    
    27
    +from buildgrid._enums import LeaseState
    
    28
    +
    
    29
    +
    
    30
    +class Tenant:
    
    31
    +
    
    32
    +    def __init__(self, lease, context):
    
    33
    +
    
    34
    +        if lease.state != LeaseState.PENDING:
    
    35
    +            raise ValueError("Lease state not `PENDING`: {}".format(lease.state))
    
    36
    +
    
    37
    +        self.logger = logging.getLogger(__name__)
    
    38
    +        self.lease_finished = False
    
    39
    +
    
    40
    +        self._context = context
    
    41
    +        self._lease = lease
    
    42
    +
    
    43
    +    @property
    
    44
    +    def lease(self):
    
    45
    +        return self._lease
    
    46
    +
    
    47
    +    def get_state_state(self):
    
    48
    +        return self._lease.state
    
    49
    +
    
    50
    +    def update_lease_state(self, state):
    
    51
    +        self._lease.state = state
    
    52
    +
    
    53
    +    def update_lease_status(self, status):
    
    54
    +        self._lease.status = status
    
    55
    +
    
    56
    +    async def run_work(self, work, executor=None, context = None):
    
    57
    +        self.logger.debug("Work created: [{}]".format(self._lease.id))
    
    58
    +
    
    59
    +        # Ensures if anything happens to the lease during work, we still have a copy.
    
    60
    +        lease = bots_pb2.Lease()
    
    61
    +        lease.CopyFrom(self._lease)
    
    62
    +
    
    63
    +        loop = asyncio.get_event_loop()
    
    64
    +
    
    65
    +        try:
    
    66
    +            lease = await loop.run_in_executor(executor, partial(work, context, self._lease))
    
    67
    +            self._lease.CopyFrom(lease)
    
    68
    +
    
    69
    +        except asyncio.CancelledError as e:
    
    70
    +            self.logger.error("Task cancelled: [{}]".format(e))
    
    71
    +
    
    72
    +        except grpc.RpcError as e:
    
    73
    +            self.logger.error("RPC error thrown: [{}]".format(e))
    
    74
    +            lease.status.CopyFrom(e.code())
    
    75
    +
    
    76
    +        except BotError as e:
    
    77
    +            self.logger.error("Internal bot error thrown: [{}]".format(e))
    
    78
    +            lease.status.code = code_pb2.INTERNAL
    
    79
    +
    
    80
    +        except Exception as e:
    
    81
    +            self.logger.error("Exception thrown: [{}]".format(e))
    
    82
    +            lease.status.code = code_pb2.INTERNAL
    
    83
    +
    
    84
    +        finally:
    
    85
    +            self.logger.debug("Work completed: [{}]".format(lease.id))

  • buildgrid/bot/tenantmanager.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +"""
    
    17
    +TenantManager
    
    18
    +=============
    
    19
    +
    
    20
    +Looks after leases of work.
    
    21
    +"""
    
    22
    +
    
    23
    +
    
    24
    +import asyncio
    
    25
    +from functools import partial
    
    26
    +
    
    27
    +import grpc
    
    28
    +
    
    29
    +from buildgrid._enums import LeaseState
    
    30
    +
    
    31
    +from .tenant import Tenant
    
    32
    +
    
    33
    +class TenantManager:
    
    34
    +
    
    35
    +    def __init__(self):
    
    36
    +        self.logger = logging.getLogger(__name__)
    
    37
    +        self._tenants = {}
    
    38
    +        self._tasks = {}
    
    39
    +
    
    40
    +    def create_tenancy(self, lease):
    
    41
    +        lease_id = lease.id
    
    42
    +
    
    43
    +        if lease_id not in self._tenants:
    
    44
    +            tenant = Tenant(lease, context)
    
    45
    +            self._tenants[lease_id] = tenant
    
    46
    +
    
    47
    +        else:
    
    48
    +            raise KeyError("Lease id already exists: [{}]".format(lease_id))
    
    49
    +
    
    50
    +    def remove_tenant(self, lease_id):
    
    51
    +        state = self.get_lease_state(lease_id)
    
    52
    +        if state != LeaseState.COMPLETED or state != LeaseState.CANCELLED:
    
    53
    +            self.logger.error("Attempting to remove a lease not finished."
    
    54
    +                              "Bot will not remove lease."
    
    55
    +                              "Lease: [{}]".format(self._tenant_manager.get_lease(lease_id)))
    
    56
    +
    
    57
    +        else:
    
    58
    +            del self._tenants[lease_id]
    
    59
    +            del self._tasks[lease_id]
    
    60
    +
    
    61
    +    def get_leases(self):
    
    62
    +        leases = []
    
    63
    +        for _, tenant in self._tenants.items():
    
    64
    +            leases.append(tenant.lease)
    
    65
    +
    
    66
    +        if not leases:
    
    67
    +            return None
    
    68
    +
    
    69
    +        return leases
    
    70
    +
    
    71
    +    def get_lease_ids(self):
    
    72
    +        lease_ids = []
    
    73
    +        for lease_id in self._tenants.keys():
    
    74
    +            lease_ids.append(lease_id)
    
    75
    +
    
    76
    +        if not lease_ids:
    
    77
    +            return None
    
    78
    +
    
    79
    +        return leases
    
    80
    +
    
    81
    +    def get_lease_state(self, lease_id):
    
    82
    +        return self._tenants[lease_id].get_lease_state()
    
    83
    +
    
    84
    +    def complete_lease(self, lease_id, status=None):
    
    85
    +        if status is not None:
    
    86
    +            self._update_lease_status(lease_id, status)
    
    87
    +
    
    88
    +        if self._tenants[lease_id].get_lease_state() != LeaseState.CANCELLED:
    
    89
    +            self._update_lease_state(lease_id, LeaseState.COMPLETED)
    
    90
    +
    
    91
    +    def create_work(self, lease_id):
    
    92
    +        self._update_lease_state(lease_id, LeaseState.ACTIVE)
    
    93
    +        tenant = self._tenants[lease_id]
    
    94
    +        task = asyncio.ensure_future(tenant.run_work())
    
    95
    +
    
    96
    +        task.add_done_callback(partial(self.complete_lease(lease_id)))
    
    97
    +
    
    98
    +    def cancel_tenancy(self, lease_id):
    
    99
    +        self._update_lease_state(LeaseState.CANCELLED)
    
    100
    +        self._tasks[lease_id].cancel()
    
    101
    +
    
    102
    +    def _update_lease_state(self, lease_id, state):
    
    103
    +        self._tenants[lease_id].update_lease_state(state)
    
    104
    +
    
    105
    +    def _update_lease_status(self, lease_id, status):
    
    106
    +        self._tenants[lease_id].update_lease_status(status)

  • buildgrid/server/bots/instance.py
    ... ... @@ -80,9 +80,11 @@ class BotsInterface:
    80 80
             self.logger.debug("Updating bot session name={}".format(name))
    
    81 81
             self._check_bot_ids(bot_session.bot_id, name)
    
    82 82
     
    
    83
    -        leases = filter(None, [self.check_states(lease) for lease in bot_session.leases])
    
    83
    +        leases = filter(None, [self._check_lease_state(lease) for lease in bot_session.leases])
    
    84
    +
    
    85
    +        for lease in bot_session.leases:
    
    86
    +            lease.Clear()
    
    84 87
     
    
    85
    -        del bot_session.leases[:]
    
    86 88
             bot_session.leases.extend(leases)
    
    87 89
     
    
    88 90
             # TODO: Send worker capabilities to the scheduler!
    
    ... ... @@ -94,55 +96,22 @@ class BotsInterface:
    94 96
             self._bot_sessions[name] = bot_session
    
    95 97
             return bot_session
    
    96 98
     
    
    97
    -    def check_states(self, client_lease):
    
    98
    -        """ Edge detector for states
    
    99
    -        """
    
    100
    -        # TODO: Handle cancelled states
    
    101
    -        try:
    
    102
    -            server_lease = self._scheduler.get_job_lease(client_lease.id)
    
    103
    -        except KeyError:
    
    104
    -            raise InvalidArgumentError("Lease not found on server: [{}]".format(client_lease))
    
    105
    -
    
    106
    -        server_state = LeaseState(server_lease.state)
    
    107
    -        client_state = LeaseState(client_lease.state)
    
    108
    -
    
    109
    -        if server_state == LeaseState.PENDING:
    
    110
    -
    
    111
    -            if client_state == LeaseState.ACTIVE:
    
    112
    -                self._scheduler.update_job_lease_state(client_lease.id,
    
    113
    -                                                       LeaseState.ACTIVE)
    
    114
    -            elif client_state == LeaseState.COMPLETED:
    
    115
    -                # TODO: Lease was rejected
    
    116
    -                raise NotImplementedError("'Not Accepted' is unsupported")
    
    117
    -            else:
    
    118
    -                raise OutOfSyncError("Server lease: [{}]. Client lease: [{}]".format(server_lease, client_lease))
    
    99
    +    def _check_lease_state(self, lease):
    
    100
    +        # Check for cancelled lease
    
    101
    +        if self._scheduler.get_lease_cancelled(lease.id):
    
    102
    +            return None
    
    119 103
     
    
    120
    -        elif server_state == LeaseState.ACTIVE:
    
    104
    +        # If not cancelled, update the status
    
    105
    +        self._scheduler.update_job_lease(lease)
    
    121 106
     
    
    122
    -            if client_state == LeaseState.ACTIVE:
    
    123
    -                pass
    
    107
    +        lease_state = LeaseState(lease.state)
    
    108
    +        if lease_state == LeaseState.COMPLETED:
    
    109
    +            return None
    
    124 110
     
    
    125
    -            elif client_state == LeaseState.COMPLETED:
    
    126
    -                self._scheduler.update_job_lease_state(client_lease.id,
    
    127
    -                                                       LeaseState.COMPLETED,
    
    128
    -                                                       lease_status=client_lease.status,
    
    129
    -                                                       lease_result=client_lease.result)
    
    130
    -                return None
    
    131
    -
    
    132
    -            else:
    
    133
    -                raise OutOfSyncError("Server lease: [{}]. Client lease: [{}]".format(server_lease, client_lease))
    
    134
    -
    
    135
    -        elif server_state == LeaseState.COMPLETED:
    
    136
    -            raise OutOfSyncError("Server lease: [{}]. Client lease: [{}]".format(server_lease, client_lease))
    
    137
    -
    
    138
    -        elif server_state == LeaseState.CANCELLED:
    
    139
    -            raise NotImplementedError("Cancelled states not supported yet")
    
    140
    -
    
    141
    -        else:
    
    142
    -            # Sould never get here
    
    143
    -            raise OutOfSyncError("State now allowed: {}".format(server_state))
    
    111
    +        elif lease_state == LeaseState.CANCELLED:
    
    112
    +            return None
    
    144 113
     
    
    145
    -        return client_lease
    
    114
    +        return lease
    
    146 115
     
    
    147 116
         def _check_bot_ids(self, bot_id, name=None):
    
    148 117
             """ Checks the ID and the name of the bot.
    

  • buildgrid/server/execution/instance.py
    ... ... @@ -52,8 +52,6 @@ class ExecutionInstance:
    52 52
             if message_queue is not None:
    
    53 53
                 job.register_client(message_queue)
    
    54 54
     
    
    55
    -        self.logger.info("Operation name: [{}]".format(job.name))
    
    56
    -
    
    57 55
             self._scheduler.queue_job(job, skip_cache_lookup)
    
    58 56
     
    
    59 57
             return job.operation
    

  • buildgrid/server/execution/service.py
    ... ... @@ -26,7 +26,7 @@ from functools import partial
    26 26
     
    
    27 27
     import grpc
    
    28 28
     
    
    29
    -from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
    
    29
    +from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, CancelledError
    
    30 30
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    31 31
     from buildgrid._protos.google.longrunning import operations_pb2
    
    32 32
     
    
    ... ... @@ -52,8 +52,17 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    52 52
                 context.add_callback(partial(instance.unregister_message_client,
    
    53 53
                                              operation.name, message_queue))
    
    54 54
     
    
    55
    -            yield from instance.stream_operation_updates(message_queue,
    
    56
    -                                                         operation.name)
    
    55
    +            instanced_op_name = "{}/{}".format(request.instance_name,
    
    56
    +                                               operation.name)
    
    57
    +
    
    58
    +            self.logger.info("Operation name: [{}]".format(instanced_op_name))
    
    59
    +
    
    60
    +            for operation in instance.stream_operation_updates(message_queue,
    
    61
    +                                                               operation.name):
    
    62
    +                op = operations_pb2.Operation()
    
    63
    +                op.CopyFrom(operation)
    
    64
    +                op.name = instanced_op_name
    
    65
    +                yield op
    
    57 66
     
    
    58 67
             except InvalidArgumentError as e:
    
    59 68
                 self.logger.error(e)
    
    ... ... @@ -67,6 +76,12 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    67 76
                 context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
    
    68 77
                 yield operations_pb2.Operation()
    
    69 78
     
    
    79
    +        except CancelledError as e:
    
    80
    +            self.logger.error(e)
    
    81
    +            context.set_details(str(e))
    
    82
    +            context.set_code(grpc.StatusCode.CANCELLED)
    
    83
    +            yield operations_pb2.Operation()
    
    84
    +
    
    70 85
         def WaitExecution(self, request, context):
    
    71 86
             try:
    
    72 87
                 names = request.name.split("/")
    
    ... ... @@ -84,8 +99,12 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    84 99
                 context.add_callback(partial(instance.unregister_message_client,
    
    85 100
                                              operation_name, message_queue))
    
    86 101
     
    
    87
    -            yield from instance.stream_operation_updates(message_queue,
    
    88
    -                                                         operation_name)
    
    102
    +            for operation in instance.stream_operation_updates(message_queue,
    
    103
    +                                                               operation_name):
    
    104
    +                op = operations_pb2.Operation()
    
    105
    +                op.CopyFrom(operation)
    
    106
    +                op.name = request.name
    
    107
    +                yield op
    
    89 108
     
    
    90 109
             except InvalidArgumentError as e:
    
    91 110
                 self.logger.error(e)
    
    ... ... @@ -93,6 +112,12 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    93 112
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    94 113
                 yield operations_pb2.Operation()
    
    95 114
     
    
    115
    +        except CancelledError as e:
    
    116
    +            self.logger.error(e)
    
    117
    +            context.set_details(str(e))
    
    118
    +            context.set_code(grpc.StatusCode.CANCELLED)
    
    119
    +            yield operations_pb2.Operation()
    
    120
    +
    
    96 121
         def _get_instance(self, name):
    
    97 122
             try:
    
    98 123
                 return self._instances[name]
    

  • buildgrid/server/job.py
    ... ... @@ -19,9 +19,11 @@ import uuid
    19 19
     from google.protobuf import timestamp_pb2
    
    20 20
     
    
    21 21
     from buildgrid._enums import LeaseState, OperationStage
    
    22
    +from buildgrid._exceptions import CancelledError
    
    22 23
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    23 24
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    24 25
     from buildgrid._protos.google.longrunning import operations_pb2
    
    26
    +from buildgrid._protos.google.rpc import code_pb2
    
    25 27
     
    
    26 28
     
    
    27 29
     class Job:
    
    ... ... @@ -36,10 +38,14 @@ class Job:
    36 38
     
    
    37 39
             self.__execute_response = None
    
    38 40
             self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    41
    +
    
    39 42
             self.__queued_timestamp = timestamp_pb2.Timestamp()
    
    40 43
             self.__worker_start_timestamp = timestamp_pb2.Timestamp()
    
    41 44
             self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
    
    42 45
     
    
    46
    +        self.__operation_cancelled = False
    
    47
    +        self.__lease_cancelled = False
    
    48
    +
    
    43 49
             self.__operation_metadata.action_digest.CopyFrom(action_digest)
    
    44 50
             self.__operation_metadata.stage = OperationStage.UNKNOWN.value
    
    45 51
     
    
    ... ... @@ -92,6 +98,10 @@ class Job:
    92 98
             else:
    
    93 99
                 return None
    
    94 100
     
    
    101
    +    @property
    
    102
    +    def lease_cancelled(self):
    
    103
    +        return self.__lease_cancelled
    
    104
    +
    
    95 105
         @property
    
    96 106
         def n_tries(self):
    
    97 107
             return self._n_tries
    
    ... ... @@ -130,7 +140,9 @@ class Job:
    130 140
             Only one :class:`Lease` can be emitted for a given job. This method
    
    131 141
             should only be used once, any furhter calls are ignored.
    
    132 142
             """
    
    133
    -        if self._lease is not None:
    
    143
    +        if self.__operation_cancelled:
    
    144
    +            return None
    
    145
    +        elif self._lease is not None:
    
    134 146
                 return None
    
    135 147
     
    
    136 148
             self._lease = bots_pb2.Lease()
    
    ... ... @@ -171,7 +183,7 @@ class Job:
    171 183
                 action_result = remote_execution_pb2.ActionResult()
    
    172 184
     
    
    173 185
                 # TODO: Make a distinction between build and bot failures!
    
    174
    -            if status.code != 0:
    
    186
    +            if status.code != code_pb2.OK:
    
    175 187
                     self._do_not_cache = True
    
    176 188
     
    
    177 189
                 if result is not None:
    
    ... ... @@ -188,6 +200,16 @@ class Job:
    188 200
                 self.__execute_response.cached_result = False
    
    189 201
                 self.__execute_response.status.CopyFrom(status)
    
    190 202
     
    
    203
    +    def cancel_lease(self):
    
    204
    +        """Triggers a job's :class:Lease cancellation.
    
    205
    +
    
    206
    +        This will not cancel the job's :class:Operation.
    
    207
    +        """
    
    208
    +        self.__lease_cancelled = True
    
    209
    +        if self._lease is not None:
    
    210
    +            self.update_lease_state(LeaseState.CANCELLED)
    
    211
    +
    
    212
    +
    
    191 213
         def update_operation_stage(self, stage):
    
    192 214
             """Operates a stage transition for the job's :class:Operation.
    
    193 215
     
    
    ... ... @@ -213,3 +235,18 @@ class Job:
    213 235
     
    
    214 236
             for queue in self._operation_update_queues:
    
    215 237
                 queue.put(self._operation)
    
    238
    +
    
    239
    +    def cancel_operation(self):
    
    240
    +        """Triggers a job's :class:Operation cancellation.
    
    241
    +
    
    242
    +        This will also cancel any job's :class:Lease that may have been issued.
    
    243
    +        """
    
    244
    +        self.__operation_cancelled = True
    
    245
    +        if self._lease is not None:
    
    246
    +            self.cancel_lease()
    
    247
    +
    
    248
    +        self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    249
    +        self.__execute_response.status.code = code_pb2.CANCELLED
    
    250
    +        self.__execute_response.status.message = "Operation cancelled by client."
    
    251
    +
    
    252
    +        self.update_operation_stage(OperationStage.COMPLETED)

  • buildgrid/server/operations/instance.py
    ... ... @@ -47,7 +47,13 @@ class OperationsInstance:
    47 47
             # TODO: Pages
    
    48 48
             # Spec says number of pages and length of a page are optional
    
    49 49
             response = operations_pb2.ListOperationsResponse()
    
    50
    -        response.operations.extend([job.operation for job in self._scheduler.list_jobs()])
    
    50
    +        operations = []
    
    51
    +        for job in self._scheduler.list_jobs():
    
    52
    +            op = operations_pb2.Operation()
    
    53
    +            op.CopyFrom(job.operation)
    
    54
    +            operations.append(op)
    
    55
    +
    
    56
    +        response.operations.extend(operations)
    
    51 57
     
    
    52 58
             return response
    
    53 59
     
    
    ... ... @@ -58,6 +64,13 @@ class OperationsInstance:
    58 64
             except KeyError:
    
    59 65
                 raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    60 66
     
    
    67
    +    def cancel_operation(self, name):
    
    68
    +        try:
    
    69
    +            self._scheduler.cancel_job_operation(name)
    
    70
    +
    
    71
    +        except KeyError:
    
    72
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    73
    +
    
    61 74
         def register_message_client(self, name, queue):
    
    62 75
             try:
    
    63 76
                 self._scheduler.register_client(name, queue)
    
    ... ... @@ -78,7 +91,3 @@ class OperationsInstance:
    78 91
                 yield operation
    
    79 92
                 operation = message_queue.get()
    
    80 93
             yield operation
    81
    -
    
    82
    -    def cancel_operation(self, name):
    
    83
    -        # TODO: Cancel leases
    
    84
    -        raise NotImplementedError("Cancelled operations not supported")

  • buildgrid/server/operations/service.py
    ... ... @@ -25,7 +25,7 @@ import grpc
    25 25
     
    
    26 26
     from google.protobuf.empty_pb2 import Empty
    
    27 27
     
    
    28
    -from buildgrid._exceptions import InvalidArgumentError
    
    28
    +from buildgrid._exceptions import CancelledError, InvalidArgumentError
    
    29 29
     from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
    
    30 30
     
    
    31 31
     
    
    ... ... @@ -44,13 +44,16 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    44 44
         def GetOperation(self, request, context):
    
    45 45
             try:
    
    46 46
                 name = request.name
    
    47
    -            operation_name = self._get_operation_name(name)
    
    48 47
     
    
    49
    -            instance = self._get_instance(name)
    
    48
    +            instance_name = self._parse_instance_name(name)
    
    49
    +            instance = self._get_instance(instance_name)
    
    50 50
     
    
    51
    +            operation_name = self._parse_operation_name(name)
    
    51 52
                 operation = instance.get_operation(operation_name)
    
    52
    -            operation.name = name
    
    53
    -            return operation
    
    53
    +            op = operations_pb2.Operation()
    
    54
    +            op.CopyFrom(operation)
    
    55
    +            op.name = name
    
    56
    +            return op
    
    54 57
     
    
    55 58
             except InvalidArgumentError as e:
    
    56 59
                 self.logger.error(e)
    
    ... ... @@ -61,17 +64,17 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    61 64
     
    
    62 65
         def ListOperations(self, request, context):
    
    63 66
             try:
    
    64
    -            # Name should be the collection name
    
    65
    -            # Or in this case, the instance_name
    
    66
    -            name = request.name
    
    67
    -            instance = self._get_instance(name)
    
    67
    +            # The request name should be the collection name
    
    68
    +            # In our case, this is just the instance_name
    
    69
    +            instance_name = request.name
    
    70
    +            instance = self._get_instance(instance_name)
    
    68 71
     
    
    69 72
                 result = instance.list_operations(request.filter,
    
    70 73
                                                   request.page_size,
    
    71 74
                                                   request.page_token)
    
    72 75
     
    
    73 76
                 for operation in result.operations:
    
    74
    -                operation.name = "{}/{}".format(name, operation.name)
    
    77
    +                operation.name = "{}/{}".format(instance_name, operation.name)
    
    75 78
     
    
    76 79
                 return result
    
    77 80
     
    
    ... ... @@ -85,10 +88,11 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    85 88
         def DeleteOperation(self, request, context):
    
    86 89
             try:
    
    87 90
                 name = request.name
    
    88
    -            operation_name = self._get_operation_name(name)
    
    89 91
     
    
    90
    -            instance = self._get_instance(name)
    
    92
    +            instance_name = self._parse_instance_name(name)
    
    93
    +            instance = self._get_instance(instance_name)
    
    91 94
     
    
    95
    +            operation_name = self._parse_operation_name(name)
    
    92 96
                 instance.delete_operation(operation_name)
    
    93 97
     
    
    94 98
             except InvalidArgumentError as e:
    
    ... ... @@ -101,13 +105,14 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    101 105
         def CancelOperation(self, request, context):
    
    102 106
             try:
    
    103 107
                 name = request.name
    
    104
    -            operation_name = self._get_operation_name(name)
    
    105 108
     
    
    106
    -            instance = self._get_instance(name)
    
    109
    +            instance_name = self._parse_instance_name(name)
    
    110
    +            instance = self._get_instance(instance_name)
    
    107 111
     
    
    112
    +            operation_name = self._parse_operation_name(name)
    
    108 113
                 instance.cancel_operation(operation_name)
    
    109 114
     
    
    110
    -        except NotImplementedError as e:
    
    115
    +        except CancelledError as e:
    
    111 116
                 self.logger.error(e)
    
    112 117
                 context.set_details(str(e))
    
    113 118
                 context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    ... ... @@ -119,20 +124,20 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    119 124
     
    
    120 125
             return Empty()
    
    121 126
     
    
    122
    -    def _get_operation_name(self, name):
    
    123
    -        return name.split("/")[-1]
    
    127
    +    def _parse_instance_name(self, name):
    
    128
    +        """ If the instance name is not blank, 'name' will have the form
    
    129
    +        {instance_name}/{operation_uuid}. Otherwise, it will just be
    
    130
    +        {operation_uuid} """
    
    131
    +        names = name.split('/')
    
    132
    +        return '/'.join(names[:-1]) if len(names) > 1 else ''
    
    133
    +
    
    134
    +    def _parse_operation_name(self, name):
    
    135
    +        names = name.split('/')
    
    136
    +        return names[-1] if len(names) > 1 else name
    
    124 137
     
    
    125 138
         def _get_instance(self, name):
    
    126 139
             try:
    
    127
    -            names = name.split("/")
    
    128
    -
    
    129
    -            # Operation name should be in format:
    
    130
    -            # {instance/name}/{operation_id}
    
    131
    -            instance_name = ''.join(names[0:-1])
    
    132
    -            if not instance_name:
    
    133
    -                return self._instances[name]
    
    134
    -
    
    135
    -            return self._instances[instance_name]
    
    140
    +            return self._instances[name]
    
    136 141
     
    
    137 142
             except KeyError:
    
    138 143
                 raise InvalidArgumentError("Instance doesn't exist on server: [{}]".format(name))

  • buildgrid/server/scheduler.py
    ... ... @@ -94,9 +94,12 @@ class Scheduler:
    94 94
             # For now, one lease at a time:
    
    95 95
             lease = job.create_lease()
    
    96 96
     
    
    97
    -        return [lease]
    
    97
    +        if lease:
    
    98
    +            return [lease]
    
    98 99
     
    
    99
    -    def update_job_lease_state(self, job_name, lease_state, lease_status=None, lease_result=None):
    
    100
    +        return None
    
    101
    +
    
    102
    +    def update_job_lease(self, lease):
    
    100 103
             """Requests a state transition for a job's current :class:Lease.
    
    101 104
     
    
    102 105
             Args:
    
    ... ... @@ -107,7 +110,9 @@ class Scheduler:
    107 110
                 lease_result (google.protobuf.Any): the lease execution result, only
    
    108 111
                     required if `lease_state` is `COMPLETED`.
    
    109 112
             """
    
    110
    -        job = self.jobs[job_name]
    
    113
    +
    
    114
    +        job = self.jobs[lease.id]
    
    115
    +        lease_state = LeaseState(lease.state)
    
    111 116
     
    
    112 117
             if lease_state == LeaseState.PENDING:
    
    113 118
                 job.update_lease_state(LeaseState.PENDING)
    
    ... ... @@ -119,7 +124,7 @@ class Scheduler:
    119 124
     
    
    120 125
             elif lease_state == LeaseState.COMPLETED:
    
    121 126
                 job.update_lease_state(LeaseState.COMPLETED,
    
    122
    -                                   status=lease_status, result=lease_result)
    
    127
    +                                   status=lease.status, result=lease.result)
    
    123 128
     
    
    124 129
                 if self._action_cache is not None and not job.do_not_cache:
    
    125 130
                     self._action_cache.update_action_result(job.action_digest, job.action_result)
    
    ... ... @@ -130,6 +135,20 @@ class Scheduler:
    130 135
             """Returns the lease associated to job, if any have been emitted yet."""
    
    131 136
             return self.jobs[job_name].lease
    
    132 137
     
    
    138
    +    def get_job_lease_cancelled(self, job_name):
    
    139
    +        """Returns true if the lease is cancelled"""
    
    140
    +        return self.jobs[job_name].lease_cancelled
    
    141
    +
    
    133 142
         def get_job_operation(self, job_name):
    
    134 143
             """Returns the operation associated to job."""
    
    135 144
             return self.jobs[job_name].operation
    
    145
    +
    
    146
    +    def cancel_job_operation(self, job_name):
    
    147
    +        """"Cancels the underlying operation of a given job.
    
    148
    +
    
    149
    +        This will also cancel any job's lease that may have been issued.
    
    150
    +
    
    151
    +        Args:
    
    152
    +            job_name (str): name of the job holding the operation to cancel.
    
    153
    +        """
    
    154
    +        self.jobs[job_name].cancel_operation()

  • docs/source/using_internal.rst
    ... ... @@ -92,7 +92,7 @@ Upload the directory containing the C file:
    92 92
     
    
    93 93
        bgd cas upload-dir /path/to/test-buildgrid
    
    94 94
     
    
    95
    -Now we send an execution request to the bot with the name of the epxected
    
    95
    +Now we send an execution request to the bot with the name of the expected
    
    96 96
     ``output-file``, a boolean describing if it is executeable, the path to the
    
    97 97
     directory we uploaded in order to calculate the digest and finally the command
    
    98 98
     to run on the bot:
    
    ... ... @@ -102,4 +102,4 @@ to run on the bot:
    102 102
        bgd execute command --output-file hello True /path/to/test-buildgrid -- gcc -Wall hello.c -o hello
    
    103 103
     
    
    104 104
     The resulting executeable should have returned to a new directory called
    
    105
    -``testing``.
    \ No newline at end of file
    105
    +``testing``.

  • setup.py
    ... ... @@ -116,7 +116,7 @@ setup(
    116 116
             'protobuf',
    
    117 117
             'grpcio',
    
    118 118
             'Click',
    
    119
    -        'pyaml',
    
    119
    +        'PyYAML',
    
    120 120
             'boto3 < 1.8.0',
    
    121 121
             'botocore < 1.11.0',
    
    122 122
         ],
    

  • tests/integration/execution_service.py
    ... ... @@ -83,7 +83,8 @@ def test_execute(skip_cache_lookup, instance, context):
    83 83
         metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    84 84
         result.metadata.Unpack(metadata)
    
    85 85
         assert metadata.stage == job.OperationStage.QUEUED.value
    
    86
    -    assert uuid.UUID(result.name, version=4)
    
    86
    +    operation_uuid = result.name.split('/')[-1]
    
    87
    +    assert uuid.UUID(operation_uuid, version=4)
    
    87 88
         assert result.done is False
    
    88 89
     
    
    89 90
     
    
    ... ... @@ -108,7 +109,7 @@ def test_wait_execution(instance, controller, context):
    108 109
         j = job.Job(action, action_digest)
    
    109 110
         j._operation.done = True
    
    110 111
     
    
    111
    -    request = remote_execution_pb2.WaitExecutionRequest(name="{}/{}".format('', j.name))
    
    112
    +    request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
    
    112 113
     
    
    113 114
         controller.execution_instance._scheduler.jobs[j.name] = j
    
    114 115
     
    

  • tests/integration/operations_service.py
    ... ... @@ -75,6 +75,16 @@ def instance(controller):
    75 75
             yield operation_service
    
    76 76
     
    
    77 77
     
    
    78
    +# Blank instance
    
    79
    +@pytest.fixture
    
    80
    +def blank_instance(controller):
    
    81
    +    with mock.patch.object(service, 'operations_pb2_grpc'):
    
    82
    +        operation_service = OperationsService(server)
    
    83
    +        operation_service.add_instance('', controller.operations_instance)
    
    84
    +
    
    85
    +        yield operation_service
    
    86
    +
    
    87
    +
    
    78 88
     # Queue an execution, get operation corresponding to that request
    
    79 89
     def test_get_operation(instance, controller, execute_request, context):
    
    80 90
         response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    ... ... @@ -82,14 +92,34 @@ def test_get_operation(instance, controller, execute_request, context):
    82 92
     
    
    83 93
         request = operations_pb2.GetOperationRequest()
    
    84 94
     
    
    95
    +    # The execution instance name is normally set in add_instance, but since
    
    96
    +    # we're manually creating the instance here, it doesn't get a name.
    
    97
    +    # Therefore we need to manually add the instance name to the operation
    
    98
    +    # name in the GetOperation request.
    
    85 99
         request.name = "{}/{}".format(instance_name, response_execute.name)
    
    86 100
     
    
    87 101
         response = instance.GetOperation(request, context)
    
    88
    -    assert response is response_execute
    
    102
    +    assert response.name == "{}/{}".format(instance_name, response_execute.name)
    
    103
    +    assert response.done == response_execute.done
    
    104
    +
    
    105
    +
    
    106
    +# Queue an execution, get operation corresponding to that request
    
    107
    +def test_get_operation_blank(blank_instance, controller, execute_request, context):
    
    108
    +    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    109
    +                                                             execute_request.skip_cache_lookup)
    
    110
    +
    
    111
    +    request = operations_pb2.GetOperationRequest()
    
    112
    +
    
    113
    +    request.name = response_execute.name
    
    114
    +
    
    115
    +    response = blank_instance.GetOperation(request, context)
    
    116
    +    assert response.name == response_execute.name
    
    117
    +    assert response.done == response_execute.done
    
    89 118
     
    
    90 119
     
    
    91 120
     def test_get_operation_fail(instance, context):
    
    92 121
         request = operations_pb2.GetOperationRequest()
    
    122
    +
    
    93 123
         request.name = "{}/{}".format(instance_name, "runner")
    
    94 124
         instance.GetOperation(request, context)
    
    95 125
     
    
    ... ... @@ -110,6 +140,18 @@ def test_list_operations(instance, controller, execute_request, context):
    110 140
         request = operations_pb2.ListOperationsRequest(name=instance_name)
    
    111 141
         response = instance.ListOperations(request, context)
    
    112 142
     
    
    143
    +    names = response.operations[0].name.split('/')
    
    144
    +    assert names[0] == instance_name
    
    145
    +    assert names[1] == response_execute.name
    
    146
    +
    
    147
    +
    
    148
    +def test_list_operations_blank(blank_instance, controller, execute_request, context):
    
    149
    +    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    150
    +                                                             execute_request.skip_cache_lookup)
    
    151
    +
    
    152
    +    request = operations_pb2.ListOperationsRequest(name='')
    
    153
    +    response = blank_instance.ListOperations(request, context)
    
    154
    +
    
    113 155
         assert response.operations[0].name.split('/')[-1] == response_execute.name
    
    114 156
     
    
    115 157
     
    
    ... ... @@ -160,15 +202,30 @@ def test_list_operations_empty(instance, context):
    160 202
     def test_delete_operation(instance, controller, execute_request, context):
    
    161 203
         response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    162 204
                                                                  execute_request.skip_cache_lookup)
    
    205
    +
    
    163 206
         request = operations_pb2.DeleteOperationRequest()
    
    164
    -    request.name = "{}/{}".format(instance_name, response_execute.name)
    
    207
    +    request.name = response_execute.name
    
    165 208
         instance.DeleteOperation(request, context)
    
    166 209
     
    
    167
    -    request = operations_pb2.GetOperationRequest()
    
    168
    -    request.name = "{}/{}".format(instance_name, response_execute.name)
    
    210
    +    request_name = "{}/{}".format(instance_name, response_execute.name)
    
    169 211
     
    
    170 212
         with pytest.raises(InvalidArgumentError):
    
    171
    -        controller.operations_instance.get_operation(response_execute.name)
    
    213
    +        controller.operations_instance.get_operation(request_name)
    
    214
    +
    
    215
    +
    
    216
    +# Send execution off, delete, try to find operation should fail
    
    217
    +def test_delete_operation_blank(blank_instance, controller, execute_request, context):
    
    218
    +    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    219
    +                                                             execute_request.skip_cache_lookup)
    
    220
    +
    
    221
    +    request = operations_pb2.DeleteOperationRequest()
    
    222
    +    request.name = response_execute.name
    
    223
    +    blank_instance.DeleteOperation(request, context)
    
    224
    +
    
    225
    +    request_name = response_execute.name
    
    226
    +
    
    227
    +    with pytest.raises(InvalidArgumentError):
    
    228
    +        controller.operations_instance.get_operation(request_name)
    
    172 229
     
    
    173 230
     
    
    174 231
     def test_delete_operation_fail(instance, context):
    
    ... ... @@ -187,6 +244,14 @@ def test_cancel_operation(instance, context):
    187 244
         context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    188 245
     
    
    189 246
     
    
    247
    +def test_cancel_operation_blank(blank_instance, context):
    
    248
    +    request = operations_pb2.CancelOperationRequest()
    
    249
    +    request.name = "runner"
    
    250
    +    blank_instance.CancelOperation(request, context)
    
    251
    +
    
    252
    +    context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    253
    +
    
    254
    +
    
    190 255
     def test_cancel_operation_instance_fail(instance, context):
    
    191 256
         request = operations_pb2.CancelOperationRequest()
    
    192 257
         instance.CancelOperation(request, context)
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]