[Notes] [Git][BuildGrid/buildgrid][mablanch/75-requests-multiplexing] 6 commits: Add instances to service-level operation names



Title: GitLab

Martin Blanchard pushed to branch mablanch/75-requests-multiplexing at BuildGrid / buildgrid

Commits:

10 changed files:

Changes:

  • buildgrid/server/execution/instance.py
    ... ... @@ -21,11 +21,9 @@ An instance of the Remote Execution Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    -from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
    
    24
    +from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
    
    25 25
     from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
    
    26 26
     
    
    27
    -from ..job import Job
    
    28
    -
    
    29 27
     
    
    30 28
     class ExecutionInstance:
    
    31 29
     
    
    ... ... @@ -37,7 +35,7 @@ class ExecutionInstance:
    37 35
         def register_instance_with_server(self, instance_name, server):
    
    38 36
             server.add_execution_instance(self, instance_name)
    
    39 37
     
    
    40
    -    def execute(self, action_digest, skip_cache_lookup, message_queue=None):
    
    38
    +    def execute(self, action_digest, skip_cache_lookup, peer=None, message_queue=None):
    
    41 39
             """ Sends a job for execution.
    
    42 40
             Queues an action and creates an Operation instance to be associated with
    
    43 41
             this action.
    
    ... ... @@ -48,29 +46,26 @@ class ExecutionInstance:
    48 46
             if not action:
    
    49 47
                 raise FailedPreconditionError("Could not get action from storage.")
    
    50 48
     
    
    51
    -        job = Job(action, action_digest)
    
    52
    -        if message_queue is not None:
    
    53
    -            job.register_client(message_queue)
    
    54
    -
    
    55
    -        self.logger.info("Operation name: [{}]".format(job.name))
    
    49
    +        job = self._scheduler.queue_job(action, action_digest, skip_cache_lookup)
    
    56 50
     
    
    57
    -        self._scheduler.queue_job(job, skip_cache_lookup)
    
    51
    +        if peer is not None and message_queue is not None:
    
    52
    +            job.register_client(peer, message_queue)
    
    58 53
     
    
    59 54
             return job.operation
    
    60 55
     
    
    61
    -    def register_message_client(self, name, queue):
    
    56
    +    def register_message_client(self, job_name, peer, message_queue):
    
    62 57
             try:
    
    63
    -            self._scheduler.register_client(name, queue)
    
    58
    +            self._scheduler.register_client(job_name, peer, message_queue)
    
    64 59
     
    
    65
    -        except KeyError:
    
    66
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    60
    +        except NotFoundError:
    
    61
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    67 62
     
    
    68
    -    def unregister_message_client(self, name, queue):
    
    63
    +    def unregister_message_client(self, job_name, peer):
    
    69 64
             try:
    
    70
    -            self._scheduler.unregister_client(name, queue)
    
    65
    +            self._scheduler.unregister_client(job_name, peer)
    
    71 66
     
    
    72
    -        except KeyError:
    
    73
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    67
    +        except NotFoundError:
    
    68
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    74 69
     
    
    75 70
         def stream_operation_updates(self, message_queue, operation_name):
    
    76 71
             operation = message_queue.get()
    

  • buildgrid/server/execution/service.py
    ... ... @@ -47,13 +47,23 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    47 47
                 instance = self._get_instance(request.instance_name)
    
    48 48
                 operation = instance.execute(request.action_digest,
    
    49 49
                                              request.skip_cache_lookup,
    
    50
    -                                         message_queue)
    
    50
    +                                         peer=context.peer(),
    
    51
    +                                         message_queue=message_queue)
    
    51 52
     
    
    52 53
                 context.add_callback(partial(instance.unregister_message_client,
    
    53
    -                                         operation.name, message_queue))
    
    54
    +                                         operation.name, context.peer()))
    
    54 55
     
    
    55
    -            yield from instance.stream_operation_updates(message_queue,
    
    56
    -                                                         operation.name)
    
    56
    +            instanced_op_name = "{}/{}".format(request.instance_name,
    
    57
    +                                               operation.name)
    
    58
    +
    
    59
    +            self.logger.info("Operation name: [{}]".format(instanced_op_name))
    
    60
    +
    
    61
    +            for operation in instance.stream_operation_updates(message_queue,
    
    62
    +                                                               operation.name):
    
    63
    +                op = operations_pb2.Operation()
    
    64
    +                op.CopyFrom(operation)
    
    65
    +                op.name = instanced_op_name
    
    66
    +                yield op
    
    57 67
     
    
    58 68
             except InvalidArgumentError as e:
    
    59 69
                 self.logger.error(e)
    
    ... ... @@ -79,13 +89,18 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    79 89
                 operation_name = names[-1]
    
    80 90
                 instance = self._get_instance(instance_name)
    
    81 91
     
    
    82
    -            instance.register_message_client(operation_name, message_queue)
    
    92
    +            instance.register_message_client(operation_name,
    
    93
    +                                             context.peer(), message_queue)
    
    83 94
     
    
    84 95
                 context.add_callback(partial(instance.unregister_message_client,
    
    85
    -                                         operation_name, message_queue))
    
    86
    -
    
    87
    -            yield from instance.stream_operation_updates(message_queue,
    
    88
    -                                                         operation_name)
    
    96
    +                                         operation_name, context.peer()))
    
    97
    +
    
    98
    +            for operation in instance.stream_operation_updates(message_queue,
    
    99
    +                                                               operation_name):
    
    100
    +                op = operations_pb2.Operation()
    
    101
    +                op.CopyFrom(operation)
    
    102
    +                op.name = request.name
    
    103
    +                yield op
    
    89 104
     
    
    90 105
             except InvalidArgumentError as e:
    
    91 106
                 self.logger.error(e)
    

  • buildgrid/server/job.py
    ... ... @@ -26,10 +26,11 @@ from buildgrid._protos.google.longrunning import operations_pb2
    26 26
     
    
    27 27
     class Job:
    
    28 28
     
    
    29
    -    def __init__(self, action, action_digest):
    
    29
    +    def __init__(self, action, action_digest, priority=0):
    
    30 30
             self.logger = logging.getLogger(__name__)
    
    31 31
     
    
    32 32
             self._name = str(uuid.uuid4())
    
    33
    +        self._priority = priority
    
    33 34
             self._action = remote_execution_pb2.Action()
    
    34 35
             self._operation = operations_pb2.Operation()
    
    35 36
             self._lease = None
    
    ... ... @@ -45,7 +46,7 @@ class Job:
    45 46
     
    
    46 47
             self._action.CopyFrom(action)
    
    47 48
             self._do_not_cache = self._action.do_not_cache
    
    48
    -        self._operation_update_queues = []
    
    49
    +        self._operation_update_queues = {}
    
    49 50
             self._operation.name = self._name
    
    50 51
             self._operation.done = False
    
    51 52
             self._n_tries = 0
    
    ... ... @@ -54,6 +55,10 @@ class Job:
    54 55
         def name(self):
    
    55 56
             return self._name
    
    56 57
     
    
    58
    +    @property
    
    59
    +    def priority(self):
    
    60
    +        return self._priority
    
    61
    +
    
    57 62
         @property
    
    58 63
         def do_not_cache(self):
    
    59 64
             return self._do_not_cache
    
    ... ... @@ -100,22 +105,34 @@ class Job:
    100 105
         def n_clients(self):
    
    101 106
             return len(self._operation_update_queues)
    
    102 107
     
    
    103
    -    def register_client(self, queue):
    
    104
    -        """Subscribes to the job's :class:`Operation` stage change events.
    
    108
    +    def __eq__(self, other):
    
    109
    +        if isinstance(other, Job):
    
    110
    +            return self.name == other.name
    
    111
    +        return False
    
    112
    +
    
    113
    +    def __ne__(self, other):
    
    114
    +        return not self.__eq__(other)
    
    115
    +
    
    116
    +    def register_client(self, peer, message_queue):
    
    117
    +        """Subscribes to the job's :class:`Operation` stage changes.
    
    105 118
     
    
    106 119
             Args:
    
    107
    -            queue (queue.Queue): the event queue to register.
    
    120
    +            peer (str): a unique string identifying the client.
    
    121
    +            message_queue (queue.Queue): the event queue to register.
    
    108 122
             """
    
    109
    -        self._operation_update_queues.append(queue)
    
    110
    -        queue.put(self._operation)
    
    123
    +        if peer not in self._operation_update_queues:
    
    124
    +            self._operation_update_queues[peer] = message_queue
    
    125
    +
    
    126
    +        message_queue.put(self._operation)
    
    111 127
     
    
    112
    -    def unregister_client(self, queue):
    
    113
    -        """Unsubscribes to the job's :class:`Operation` stage change events.
    
    128
    +    def unregister_client(self, peer):
    
    129
    +        """Unsubscribes to the job's :class:`Operation` stage change.
    
    114 130
     
    
    115 131
             Args:
    
    116
    -            queue (queue.Queue): the event queue to unregister.
    
    132
    +            peer (str): a unique string identifying the client.
    
    117 133
             """
    
    118
    -        self._operation_update_queues.remove(queue)
    
    134
    +        if peer not in self._operation_update_queues:
    
    135
    +            del self._operation_update_queues[peer]
    
    119 136
     
    
    120 137
         def set_cached_result(self, action_result):
    
    121 138
             """Allows specifying an action result form the action cache for the job.
    
    ... ... @@ -211,5 +228,5 @@ class Job:
    211 228
     
    
    212 229
             self._operation.metadata.Pack(self.__operation_metadata)
    
    213 230
     
    
    214
    -        for queue in self._operation_update_queues:
    
    231
    +        for queue in self._operation_update_queues.values():
    
    215 232
                 queue.put(self._operation)

  • buildgrid/server/operations/instance.py
    ... ... @@ -21,7 +21,7 @@ An instance of the LongRunningOperations Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    -from buildgrid._exceptions import InvalidArgumentError
    
    24
    +from buildgrid._exceptions import InvalidArgumentError, NotFoundError
    
    25 25
     from buildgrid._protos.google.longrunning import operations_pb2
    
    26 26
     
    
    27 27
     
    
    ... ... @@ -34,43 +34,49 @@ class OperationsInstance:
    34 34
         def register_instance_with_server(self, instance_name, server):
    
    35 35
             server.add_operations_instance(self, instance_name)
    
    36 36
     
    
    37
    -    def get_operation(self, name):
    
    38
    -        job = self._scheduler.jobs.get(name)
    
    37
    +    def get_operation(self, job_name):
    
    38
    +        try:
    
    39
    +            operation = self._scheduler.get_job_operation(job_name)
    
    39 40
     
    
    40
    -        if job is None:
    
    41
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    41
    +        except NotFoundError:
    
    42
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    42 43
     
    
    43
    -        else:
    
    44
    -            return job.operation
    
    44
    +        return operation
    
    45 45
     
    
    46 46
         def list_operations(self, list_filter, page_size, page_token):
    
    47 47
             # TODO: Pages
    
    48 48
             # Spec says number of pages and length of a page are optional
    
    49 49
             response = operations_pb2.ListOperationsResponse()
    
    50
    -        response.operations.extend([job.operation for job in self._scheduler.list_jobs()])
    
    50
    +        operations = []
    
    51
    +        for job in self._scheduler.list_jobs():
    
    52
    +            op = operations_pb2.Operation()
    
    53
    +            op.CopyFrom(job.operation)
    
    54
    +            operations.append(op)
    
    55
    +
    
    56
    +        response.operations.extend(operations)
    
    51 57
     
    
    52 58
             return response
    
    53 59
     
    
    54
    -    def delete_operation(self, name):
    
    60
    +    def delete_operation(self, job_name, peer):
    
    55 61
             try:
    
    56
    -            self._scheduler.jobs.pop(name)
    
    62
    +            self._scheduler.unregister_client(job_name, peer)
    
    57 63
     
    
    58
    -        except KeyError:
    
    59
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    64
    +        except NotFoundError:
    
    65
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    60 66
     
    
    61
    -    def register_message_client(self, name, queue):
    
    67
    +    def register_message_client(self, job_name, peer, message_queue):
    
    62 68
             try:
    
    63
    -            self._scheduler.register_client(name, queue)
    
    69
    +            self._scheduler.register_client(job_name, peer, message_queue)
    
    64 70
     
    
    65
    -        except KeyError:
    
    66
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    71
    +        except NotFoundError:
    
    72
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    67 73
     
    
    68
    -    def unregister_message_client(self, name, queue):
    
    74
    +    def unregister_message_client(self, job_name, peer):
    
    69 75
             try:
    
    70
    -            self._scheduler.unregister_client(name, queue)
    
    76
    +            self._scheduler.unregister_client(job_name, peer)
    
    71 77
     
    
    72
    -        except KeyError:
    
    73
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    78
    +        except NotFoundError:
    
    79
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    74 80
     
    
    75 81
         def stream_operation_updates(self, message_queue, operation_name):
    
    76 82
             operation = message_queue.get()
    

  • buildgrid/server/operations/service.py
    ... ... @@ -44,13 +44,16 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    44 44
         def GetOperation(self, request, context):
    
    45 45
             try:
    
    46 46
                 name = request.name
    
    47
    -            operation_name = self._get_operation_name(name)
    
    48 47
     
    
    49
    -            instance = self._get_instance(name)
    
    48
    +            instance_name = self._parse_instance_name(name)
    
    49
    +            instance = self._get_instance(instance_name)
    
    50 50
     
    
    51
    +            operation_name = self._parse_operation_name(name)
    
    51 52
                 operation = instance.get_operation(operation_name)
    
    52
    -            operation.name = name
    
    53
    -            return operation
    
    53
    +            op = operations_pb2.Operation()
    
    54
    +            op.CopyFrom(operation)
    
    55
    +            op.name = name
    
    56
    +            return op
    
    54 57
     
    
    55 58
             except InvalidArgumentError as e:
    
    56 59
                 self.logger.error(e)
    
    ... ... @@ -61,17 +64,17 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    61 64
     
    
    62 65
         def ListOperations(self, request, context):
    
    63 66
             try:
    
    64
    -            # Name should be the collection name
    
    65
    -            # Or in this case, the instance_name
    
    66
    -            name = request.name
    
    67
    -            instance = self._get_instance(name)
    
    67
    +            # The request name should be the collection name
    
    68
    +            # In our case, this is just the instance_name
    
    69
    +            instance_name = request.name
    
    70
    +            instance = self._get_instance(instance_name)
    
    68 71
     
    
    69 72
                 result = instance.list_operations(request.filter,
    
    70 73
                                                   request.page_size,
    
    71 74
                                                   request.page_token)
    
    72 75
     
    
    73 76
                 for operation in result.operations:
    
    74
    -                operation.name = "{}/{}".format(name, operation.name)
    
    77
    +                operation.name = "{}/{}".format(instance_name, operation.name)
    
    75 78
     
    
    76 79
                 return result
    
    77 80
     
    
    ... ... @@ -85,11 +88,12 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    85 88
         def DeleteOperation(self, request, context):
    
    86 89
             try:
    
    87 90
                 name = request.name
    
    88
    -            operation_name = self._get_operation_name(name)
    
    89 91
     
    
    90
    -            instance = self._get_instance(name)
    
    92
    +            instance_name = self._parse_instance_name(name)
    
    93
    +            instance = self._get_instance(instance_name)
    
    91 94
     
    
    92
    -            instance.delete_operation(operation_name)
    
    95
    +            operation_name = self._parse_operation_name(name)
    
    96
    +            instance.delete_operation(operation_name, context.peer())
    
    93 97
     
    
    94 98
             except InvalidArgumentError as e:
    
    95 99
                 self.logger.error(e)
    
    ... ... @@ -101,10 +105,11 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    101 105
         def CancelOperation(self, request, context):
    
    102 106
             try:
    
    103 107
                 name = request.name
    
    104
    -            operation_name = self._get_operation_name(name)
    
    105 108
     
    
    106
    -            instance = self._get_instance(name)
    
    109
    +            instance_name = self._parse_instance_name(name)
    
    110
    +            instance = self._get_instance(instance_name)
    
    107 111
     
    
    112
    +            operation_name = self._parse_operation_name(name)
    
    108 113
                 instance.cancel_operation(operation_name)
    
    109 114
     
    
    110 115
             except NotImplementedError as e:
    
    ... ... @@ -119,20 +124,20 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    119 124
     
    
    120 125
             return Empty()
    
    121 126
     
    
    122
    -    def _get_operation_name(self, name):
    
    123
    -        return name.split("/")[-1]
    
    127
    +    def _parse_instance_name(self, name):
    
    128
    +        """ If the instance name is not blank, 'name' will have the form
    
    129
    +        {instance_name}/{operation_uuid}. Otherwise, it will just be
    
    130
    +        {operation_uuid} """
    
    131
    +        names = name.split('/')
    
    132
    +        return '/'.join(names[:-1]) if len(names) > 1 else ''
    
    133
    +
    
    134
    +    def _parse_operation_name(self, name):
    
    135
    +        names = name.split('/')
    
    136
    +        return names[-1] if len(names) > 1 else name
    
    124 137
     
    
    125 138
         def _get_instance(self, name):
    
    126 139
             try:
    
    127
    -            names = name.split("/")
    
    128
    -
    
    129
    -            # Operation name should be in format:
    
    130
    -            # {instance/name}/{operation_id}
    
    131
    -            instance_name = ''.join(names[0:-1])
    
    132
    -            if not instance_name:
    
    133
    -                return self._instances[name]
    
    134
    -
    
    135
    -            return self._instances[instance_name]
    
    140
    +            return self._instances[name]
    
    136 141
     
    
    137 142
             except KeyError:
    
    138 143
                 raise InvalidArgumentError("Instance doesn't exist on server: [{}]".format(name))

  • buildgrid/server/scheduler.py
    ... ... @@ -23,7 +23,7 @@ from collections import deque
    23 23
     
    
    24 24
     from buildgrid._exceptions import NotFoundError
    
    25 25
     
    
    26
    -from .job import OperationStage, LeaseState
    
    26
    +from .job import Job, OperationStage, LeaseState
    
    27 27
     
    
    28 28
     
    
    29 29
     class Scheduler:
    
    ... ... @@ -32,28 +32,76 @@ class Scheduler:
    32 32
     
    
    33 33
         def __init__(self, action_cache=None):
    
    34 34
             self._action_cache = action_cache
    
    35
    -        self.jobs = {}
    
    36
    -        self.queue = deque()
    
    35
    +
    
    36
    +        self.__jobs_by_action = {}
    
    37
    +        self.__jobs_by_name = {}
    
    38
    +        self.__queue = deque()
    
    37 39
     
    
    38 40
         def register_client(self, job_name, queue):
    
    39
    -        self.jobs[job_name].register_client(queue)
    
    41
    +        self.__jobs_by_name[job_name].register_client(queue)
    
    40 42
     
    
    41 43
         def unregister_client(self, job_name, queue):
    
    42
    -        self.jobs[job_name].unregister_client(queue)
    
    44
    +        job = self.__jobs_by_name[job_name]
    
    45
    +        job.unregister_client(queue)
    
    46
    +
    
    47
    +        if job.n_clients == 0 and job.operation.done:
    
    48
    +            del self.__jobs_by_action[job.action_digest]
    
    49
    +            del self.__jobs_by_name[job.name]
    
    50
    +
    
    51
    +    def queue_job(self, action, action_digest, priority=0, skip_cache_lookup=False):
    
    52
    +        """Inserts a newly created job into the execution queue.
    
    53
    +
    
    54
    +        Args:
    
    55
    +            action (Action): the given action to queue for execution.
    
    56
    +            action_digest (Digest): the digest of the given action.
    
    57
    +            priority (int): the execution job's priority.
    
    58
    +            skip_cache_lookup (bool): whether or not to look for pre-computed
    
    59
    +                result for the given action.
    
    60
    +        """
    
    61
    +        def __queue_job(queue, new_job):
    
    62
    +            index = 0
    
    63
    +            if new_job.priority >= 0:
    
    64
    +                for queued_job in reversed(queue):
    
    65
    +                    if new_job.priority < queued_job.priority:
    
    66
    +                        index += 1
    
    67
    +                    else:
    
    68
    +                        break
    
    69
    +                index = len(queue) - index
    
    70
    +
    
    71
    +            else:
    
    72
    +                for queued_job in queue:
    
    73
    +                    if new_job.priority >= queued_job.priority:
    
    74
    +                        index += 1
    
    75
    +                    else:
    
    76
    +                        break
    
    77
    +
    
    78
    +            queue.insert(index, new_job)
    
    79
    +
    
    80
    +        if action_digest.hash in self.__jobs_by_action:
    
    81
    +            job = self.__jobs_by_action[action_digest.hash]
    
    82
    +
    
    83
    +            if priority < job.priority:
    
    84
    +                job.priority = priority
    
    43 85
     
    
    44
    -        if not self.jobs[job_name].n_clients and self.jobs[job_name].operation.done:
    
    45
    -            del self.jobs[job_name]
    
    86
    +                if job in self.__queue:
    
    87
    +                    self.__queue.remove(job)
    
    88
    +                    __queue_job(self.__queue, job)
    
    46 89
     
    
    47
    -    def queue_job(self, job, skip_cache_lookup=False):
    
    48
    -        self.jobs[job.name] = job
    
    90
    +            return job
    
    91
    +
    
    92
    +        job = Job(action, action_digest, priority=priority)
    
    93
    +
    
    94
    +        self.__jobs_by_action[job.action_digest.hash] = job
    
    95
    +        self.__jobs_by_name[job.name] = job
    
    49 96
     
    
    50 97
             operation_stage = None
    
    51 98
             if self._action_cache is not None and not skip_cache_lookup:
    
    52 99
                 try:
    
    53 100
                     action_result = self._action_cache.get_action_result(job.action_digest)
    
    101
    +
    
    54 102
                 except NotFoundError:
    
    55 103
                     operation_stage = OperationStage.QUEUED
    
    56
    -                self.queue.append(job)
    
    104
    +                __queue_job(self.__queue, job)
    
    57 105
     
    
    58 106
                 else:
    
    59 107
                     job.set_cached_result(action_result)
    
    ... ... @@ -61,23 +109,25 @@ class Scheduler:
    61 109
     
    
    62 110
             else:
    
    63 111
                 operation_stage = OperationStage.QUEUED
    
    64
    -            self.queue.append(job)
    
    112
    +            __queue_job(self.__queue, job)
    
    65 113
     
    
    66 114
             job.update_operation_stage(operation_stage)
    
    67 115
     
    
    116
    +        return job
    
    117
    +
    
    68 118
         def retry_job(self, job_name):
    
    69
    -        if job_name in self.jobs:
    
    70
    -            job = self.jobs[job_name]
    
    119
    +        if job_name in self.__jobs_by_name:
    
    120
    +            job = self.__jobs_by_name[job_name]
    
    71 121
                 if job.n_tries >= self.MAX_N_TRIES:
    
    72 122
                     # TODO: Decide what to do with these jobs
    
    73 123
                     job.update_operation_stage(OperationStage.COMPLETED)
    
    74 124
                     # TODO: Mark these jobs as done
    
    75 125
                 else:
    
    76 126
                     job.update_operation_stage(OperationStage.QUEUED)
    
    77
    -                self.queue.appendleft(job)
    
    127
    +                self.__queue.appendleft(job)
    
    78 128
     
    
    79 129
         def list_jobs(self):
    
    80
    -        return self.jobs.values()
    
    130
    +        return self.__jobs_by_name.values()
    
    81 131
     
    
    82 132
         def request_job_leases(self, worker_capabilities):
    
    83 133
             """Generates a list of the highest priority leases to be run.
    
    ... ... @@ -87,10 +137,10 @@ class Scheduler:
    87 137
                     worker properties, configuration and state at the time of the
    
    88 138
                     request.
    
    89 139
             """
    
    90
    -        if not self.queue:
    
    140
    +        if not self.__queue:
    
    91 141
                 return []
    
    92 142
     
    
    93
    -        job = self.queue.popleft()
    
    143
    +        job = self.__queue.popleft()
    
    94 144
             # For now, one lease at a time:
    
    95 145
             lease = job.create_lease()
    
    96 146
     
    
    ... ... @@ -107,7 +157,7 @@ class Scheduler:
    107 157
                 lease_result (google.protobuf.Any): the lease execution result, only
    
    108 158
                     required if `lease_state` is `COMPLETED`.
    
    109 159
             """
    
    110
    -        job = self.jobs[job_name]
    
    160
    +        job = self.__jobs_by_name[job_name]
    
    111 161
     
    
    112 162
             if lease_state == LeaseState.PENDING:
    
    113 163
                 job.update_lease_state(LeaseState.PENDING)
    
    ... ... @@ -127,9 +177,33 @@ class Scheduler:
    127 177
                 job.update_operation_stage(OperationStage.COMPLETED)
    
    128 178
     
    
    129 179
         def get_job_lease(self, job_name):
    
    130
    -        """Returns the lease associated to job, if any have been emitted yet."""
    
    131
    -        return self.jobs[job_name].lease
    
    180
    +        """Returns the lease associated to job, if any have been emitted yet.
    
    181
    +
    
    182
    +        Args:
    
    183
    +            job_name (str): name of the job to query.
    
    184
    +
    
    185
    +        Raises:
    
    186
    +            NotFoundError: If no job with `job_name` exists.
    
    187
    +        """
    
    188
    +        try:
    
    189
    +            job = self.__jobs_by_name[job_name]
    
    190
    +        except KeyError:
    
    191
    +            raise NotFoundError('No job named {} found.'.format(job_name))
    
    192
    +
    
    193
    +        return job.lease
    
    132 194
     
    
    133 195
         def get_job_operation(self, job_name):
    
    134
    -        """Returns the operation associated to job."""
    
    135
    -        return self.jobs[job_name].operation
    196
    +        """Returns the operation associated to job.
    
    197
    +
    
    198
    +        Args:
    
    199
    +            job_name (str): name of the job to query.
    
    200
    +
    
    201
    +        Raises:
    
    202
    +            NotFoundError: If no job with `job_name` exists.
    
    203
    +        """
    
    204
    +        try:
    
    205
    +            job = self.__jobs_by_name[job_name]
    
    206
    +        except KeyError:
    
    207
    +            raise NotFoundError('No job named {} found.'.format(job_name))
    
    208
    +
    
    209
    +        return job.operation

  • docs/source/using_internal.rst
    ... ... @@ -92,7 +92,7 @@ Upload the directory containing the C file:
    92 92
     
    
    93 93
        bgd cas upload-dir /path/to/test-buildgrid
    
    94 94
     
    
    95
    -Now we send an execution request to the bot with the name of the epxected
    
    95
    +Now we send an execution request to the bot with the name of the expected
    
    96 96
     ``output-file``, a boolean describing if it is executeable, the path to the
    
    97 97
     directory we uploaded in order to calculate the digest and finally the command
    
    98 98
     to run on the bot:
    
    ... ... @@ -102,4 +102,4 @@ to run on the bot:
    102 102
        bgd execute command --output-file hello True /path/to/test-buildgrid -- gcc -Wall hello.c -o hello
    
    103 103
     
    
    104 104
     The resulting executeable should have returned to a new directory called
    
    105
    -``testing``.
    \ No newline at end of file
    105
    +``testing``.

  • tests/integration/bots_service.py
    ... ... @@ -26,7 +26,6 @@ import pytest
    26 26
     
    
    27 27
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    28 28
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    29
    -from buildgrid.server import job
    
    30 29
     from buildgrid.server.controller import ExecutionController
    
    31 30
     from buildgrid.server.job import LeaseState
    
    32 31
     from buildgrid.server.bots import service
    
    ... ... @@ -283,7 +282,8 @@ def test_post_bot_event_temp(context, instance):
    283 282
     def _inject_work(scheduler, action=None, action_digest=None):
    
    284 283
         if not action:
    
    285 284
             action = remote_execution_pb2.Action()
    
    285
    +
    
    286 286
         if not action_digest:
    
    287 287
             action_digest = remote_execution_pb2.Digest()
    
    288
    -    j = job.Job(action, action_digest)
    
    289
    -    scheduler.queue_job(j, True)
    288
    +
    
    289
    +    scheduler.queue_job(action, action_digest, skip_cache_lookup=True)

  • tests/integration/execution_service.py
    ... ... @@ -83,7 +83,8 @@ def test_execute(skip_cache_lookup, instance, context):
    83 83
         metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    84 84
         result.metadata.Unpack(metadata)
    
    85 85
         assert metadata.stage == job.OperationStage.QUEUED.value
    
    86
    -    assert uuid.UUID(result.name, version=4)
    
    86
    +    operation_uuid = result.name.split('/')[-1]
    
    87
    +    assert uuid.UUID(operation_uuid, version=4)
    
    87 88
         assert result.done is False
    
    88 89
     
    
    89 90
     
    
    ... ... @@ -105,12 +106,12 @@ def test_no_action_digest_in_storage(instance, context):
    105 106
     
    
    106 107
     
    
    107 108
     def test_wait_execution(instance, controller, context):
    
    108
    -    j = job.Job(action, action_digest)
    
    109
    +    j = controller.execution_instance._scheduler.queue_job(action,
    
    110
    +                                                           action_digest,
    
    111
    +                                                           skip_cache_lookup=True)
    
    109 112
         j._operation.done = True
    
    110 113
     
    
    111
    -    request = remote_execution_pb2.WaitExecutionRequest(name="{}/{}".format('', j.name))
    
    112
    -
    
    113
    -    controller.execution_instance._scheduler.jobs[j.name] = j
    
    114
    +    request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
    
    114 115
     
    
    115 116
         action_result_any = any_pb2.Any()
    
    116 117
         action_result = remote_execution_pb2.ActionResult()
    

  • tests/integration/operations_service.py
    ... ... @@ -75,6 +75,16 @@ def instance(controller):
    75 75
             yield operation_service
    
    76 76
     
    
    77 77
     
    
    78
    +# Blank instance
    
    79
    +@pytest.fixture
    
    80
    +def blank_instance(controller):
    
    81
    +    with mock.patch.object(service, 'operations_pb2_grpc'):
    
    82
    +        operation_service = OperationsService(server)
    
    83
    +        operation_service.add_instance('', controller.operations_instance)
    
    84
    +
    
    85
    +        yield operation_service
    
    86
    +
    
    87
    +
    
    78 88
     # Queue an execution, get operation corresponding to that request
    
    79 89
     def test_get_operation(instance, controller, execute_request, context):
    
    80 90
         response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    ... ... @@ -82,14 +92,34 @@ def test_get_operation(instance, controller, execute_request, context):
    82 92
     
    
    83 93
         request = operations_pb2.GetOperationRequest()
    
    84 94
     
    
    95
    +    # The execution instance name is normally set in add_instance, but since
    
    96
    +    # we're manually creating the instance here, it doesn't get a name.
    
    97
    +    # Therefore we need to manually add the instance name to the operation
    
    98
    +    # name in the GetOperation request.
    
    85 99
         request.name = "{}/{}".format(instance_name, response_execute.name)
    
    86 100
     
    
    87 101
         response = instance.GetOperation(request, context)
    
    88
    -    assert response is response_execute
    
    102
    +    assert response.name == "{}/{}".format(instance_name, response_execute.name)
    
    103
    +    assert response.done == response_execute.done
    
    104
    +
    
    105
    +
    
    106
    +# Queue an execution, get operation corresponding to that request
    
    107
    +def test_get_operation_blank(blank_instance, controller, execute_request, context):
    
    108
    +    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    109
    +                                                             execute_request.skip_cache_lookup)
    
    110
    +
    
    111
    +    request = operations_pb2.GetOperationRequest()
    
    112
    +
    
    113
    +    request.name = response_execute.name
    
    114
    +
    
    115
    +    response = blank_instance.GetOperation(request, context)
    
    116
    +    assert response.name == response_execute.name
    
    117
    +    assert response.done == response_execute.done
    
    89 118
     
    
    90 119
     
    
    91 120
     def test_get_operation_fail(instance, context):
    
    92 121
         request = operations_pb2.GetOperationRequest()
    
    122
    +
    
    93 123
         request.name = "{}/{}".format(instance_name, "runner")
    
    94 124
         instance.GetOperation(request, context)
    
    95 125
     
    
    ... ... @@ -110,6 +140,18 @@ def test_list_operations(instance, controller, execute_request, context):
    110 140
         request = operations_pb2.ListOperationsRequest(name=instance_name)
    
    111 141
         response = instance.ListOperations(request, context)
    
    112 142
     
    
    143
    +    names = response.operations[0].name.split('/')
    
    144
    +    assert names[0] == instance_name
    
    145
    +    assert names[1] == response_execute.name
    
    146
    +
    
    147
    +
    
    148
    +def test_list_operations_blank(blank_instance, controller, execute_request, context):
    
    149
    +    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    150
    +                                                             execute_request.skip_cache_lookup)
    
    151
    +
    
    152
    +    request = operations_pb2.ListOperationsRequest(name='')
    
    153
    +    response = blank_instance.ListOperations(request, context)
    
    154
    +
    
    113 155
         assert response.operations[0].name.split('/')[-1] == response_execute.name
    
    114 156
     
    
    115 157
     
    
    ... ... @@ -131,7 +173,7 @@ def test_list_operations_with_result(instance, controller, execute_request, cont
    131 173
         output_file = remote_execution_pb2.OutputFile(path='unicorn')
    
    132 174
         action_result.output_files.extend([output_file])
    
    133 175
     
    
    134
    -    controller.operations_instance._scheduler.jobs[response_execute.name].create_lease()
    
    176
    +    controller.operations_instance._scheduler.request_job_leases({})
    
    135 177
         controller.operations_instance._scheduler.update_job_lease_state(response_execute.name,
    
    136 178
                                                                          LeaseState.COMPLETED,
    
    137 179
                                                                          lease_status=status_pb2.Status(),
    
    ... ... @@ -160,15 +202,30 @@ def test_list_operations_empty(instance, context):
    160 202
     def test_delete_operation(instance, controller, execute_request, context):
    
    161 203
         response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    162 204
                                                                  execute_request.skip_cache_lookup)
    
    205
    +
    
    163 206
         request = operations_pb2.DeleteOperationRequest()
    
    164
    -    request.name = "{}/{}".format(instance_name, response_execute.name)
    
    207
    +    request.name = response_execute.name
    
    165 208
         instance.DeleteOperation(request, context)
    
    166 209
     
    
    167
    -    request = operations_pb2.GetOperationRequest()
    
    168
    -    request.name = "{}/{}".format(instance_name, response_execute.name)
    
    210
    +    request_name = "{}/{}".format(instance_name, response_execute.name)
    
    169 211
     
    
    170 212
         with pytest.raises(InvalidArgumentError):
    
    171
    -        controller.operations_instance.get_operation(response_execute.name)
    
    213
    +        controller.operations_instance.get_operation(request_name)
    
    214
    +
    
    215
    +
    
    216
    +# Send execution off, delete, try to find operation should fail
    
    217
    +def test_delete_operation_blank(blank_instance, controller, execute_request, context):
    
    218
    +    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    219
    +                                                             execute_request.skip_cache_lookup)
    
    220
    +
    
    221
    +    request = operations_pb2.DeleteOperationRequest()
    
    222
    +    request.name = response_execute.name
    
    223
    +    blank_instance.DeleteOperation(request, context)
    
    224
    +
    
    225
    +    request_name = response_execute.name
    
    226
    +
    
    227
    +    with pytest.raises(InvalidArgumentError):
    
    228
    +        controller.operations_instance.get_operation(request_name)
    
    172 229
     
    
    173 230
     
    
    174 231
     def test_delete_operation_fail(instance, context):
    
    ... ... @@ -187,6 +244,14 @@ def test_cancel_operation(instance, context):
    187 244
         context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    188 245
     
    
    189 246
     
    
    247
    +def test_cancel_operation_blank(blank_instance, context):
    
    248
    +    request = operations_pb2.CancelOperationRequest()
    
    249
    +    request.name = "runner"
    
    250
    +    blank_instance.CancelOperation(request, context)
    
    251
    +
    
    252
    +    context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    253
    +
    
    254
    +
    
    190 255
     def test_cancel_operation_instance_fail(instance, context):
    
    191 256
         request = operations_pb2.CancelOperationRequest()
    
    192 257
         instance.CancelOperation(request, context)
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]