[Notes] [Git][BuildGrid/buildgrid][mablanch/75-requests-multiplexing] 4 commits: setup.py: Unpin the coverage dependency



Title: GitLab

Martin Blanchard pushed to branch mablanch/75-requests-multiplexing at BuildGrid / buildgrid

Commits:

6 changed files:

Changes:

  • buildgrid/server/execution/instance.py
    ... ... @@ -21,11 +21,9 @@ An instance of the Remote Execution Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    -from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
    
    24
    +from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
    
    25 25
     from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
    
    26 26
     
    
    27
    -from ..job import Job
    
    28
    -
    
    29 27
     
    
    30 28
     class ExecutionInstance:
    
    31 29
     
    
    ... ... @@ -37,7 +35,7 @@ class ExecutionInstance:
    37 35
         def register_instance_with_server(self, instance_name, server):
    
    38 36
             server.add_execution_instance(self, instance_name)
    
    39 37
     
    
    40
    -    def execute(self, action_digest, skip_cache_lookup, message_queue=None):
    
    38
    +    def execute(self, action_digest, skip_cache_lookup, peer=None, message_queue=None):
    
    41 39
             """ Sends a job for execution.
    
    42 40
             Queues an action and creates an Operation instance to be associated with
    
    43 41
             this action.
    
    ... ... @@ -48,28 +46,27 @@ class ExecutionInstance:
    48 46
             if not action:
    
    49 47
                 raise FailedPreconditionError("Could not get action from storage.")
    
    50 48
     
    
    51
    -        job = Job(action, action_digest)
    
    52
    -        if message_queue is not None:
    
    53
    -            job.register_client(message_queue)
    
    49
    +        job = self._scheduler.queue_job(action, action_digest, skip_cache_lookup)
    
    54 50
     
    
    55 51
             self.logger.info("Operation name: [{}]".format(job.name))
    
    56 52
     
    
    57
    -        self._scheduler.queue_job(job, skip_cache_lookup)
    
    53
    +        if peer is not None and message_queue is not None:
    
    54
    +            job.register_client(peer, message_queue)
    
    58 55
     
    
    59 56
             return job.operation
    
    60 57
     
    
    61
    -    def register_message_client(self, name, queue):
    
    58
    +    def register_message_client(self, job_name, peer, message_queue):
    
    62 59
             try:
    
    63
    -            self._scheduler.register_client(name, queue)
    
    60
    +            self._scheduler.register_client(job_name, peer, message_queue)
    
    64 61
     
    
    65
    -        except KeyError:
    
    62
    +        except NotFoundError:
    
    66 63
                 raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    67 64
     
    
    68
    -    def unregister_message_client(self, name, queue):
    
    65
    +    def unregister_message_client(self, job_name, peer):
    
    69 66
             try:
    
    70
    -            self._scheduler.unregister_client(name, queue)
    
    67
    +            self._scheduler.unregister_client(job_name, peer)
    
    71 68
     
    
    72
    -        except KeyError:
    
    69
    +        except NotFoundError:
    
    73 70
                 raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    74 71
     
    
    75 72
         def stream_operation_updates(self, message_queue, operation_name):
    

  • buildgrid/server/execution/service.py
    ... ... @@ -47,10 +47,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    47 47
                 instance = self._get_instance(request.instance_name)
    
    48 48
                 operation = instance.execute(request.action_digest,
    
    49 49
                                              request.skip_cache_lookup,
    
    50
    -                                         message_queue)
    
    50
    +                                         peer=context.peer(),
    
    51
    +                                         message_queue=message_queue)
    
    51 52
     
    
    52 53
                 context.add_callback(partial(instance.unregister_message_client,
    
    53
    -                                         operation.name, message_queue))
    
    54
    +                                         operation.name, context.peer()))
    
    54 55
     
    
    55 56
                 yield from instance.stream_operation_updates(message_queue,
    
    56 57
                                                              operation.name)
    

  • buildgrid/server/job.py
    ... ... @@ -26,10 +26,11 @@ from buildgrid._protos.google.longrunning import operations_pb2
    26 26
     
    
    27 27
     class Job:
    
    28 28
     
    
    29
    -    def __init__(self, action, action_digest):
    
    29
    +    def __init__(self, action, action_digest, priority=0):
    
    30 30
             self.logger = logging.getLogger(__name__)
    
    31 31
     
    
    32 32
             self._name = str(uuid.uuid4())
    
    33
    +        self._priority = priority
    
    33 34
             self._action = remote_execution_pb2.Action()
    
    34 35
             self._operation = operations_pb2.Operation()
    
    35 36
             self._lease = None
    
    ... ... @@ -45,7 +46,7 @@ class Job:
    45 46
     
    
    46 47
             self._action.CopyFrom(action)
    
    47 48
             self._do_not_cache = self._action.do_not_cache
    
    48
    -        self._operation_update_queues = []
    
    49
    +        self._operation_update_queues = {}
    
    49 50
             self._operation.name = self._name
    
    50 51
             self._operation.done = False
    
    51 52
             self._n_tries = 0
    
    ... ... @@ -54,6 +55,10 @@ class Job:
    54 55
         def name(self):
    
    55 56
             return self._name
    
    56 57
     
    
    58
    +    @property
    
    59
    +    def priority(self):
    
    60
    +        return self._priority
    
    61
    +
    
    57 62
         @property
    
    58 63
         def do_not_cache(self):
    
    59 64
             return self._do_not_cache
    
    ... ... @@ -100,22 +105,26 @@ class Job:
    100 105
         def n_clients(self):
    
    101 106
             return len(self._operation_update_queues)
    
    102 107
     
    
    103
    -    def register_client(self, queue):
    
    104
    -        """Subscribes to the job's :class:`Operation` stage change events.
    
    108
    +    def register_client(self, peer, message_queue):
    
    109
    +        """Subscribes to the job's :class:`Operation` stage changes.
    
    105 110
     
    
    106 111
             Args:
    
    107
    -            queue (queue.Queue): the event queue to register.
    
    112
    +            peer (str): a unique string identifying the client.
    
    113
    +            message_queue (queue.Queue): the event queue to register.
    
    108 114
             """
    
    109
    -        self._operation_update_queues.append(queue)
    
    110
    -        queue.put(self._operation)
    
    115
    +        if peer not in self._operation_update_queues:
    
    116
    +            self._operation_update_queues[peer] = message_queue
    
    117
    +
    
    118
    +        message_queue.put(self._operation)
    
    111 119
     
    
    112
    -    def unregister_client(self, queue):
    
    113
    -        """Unsubscribes to the job's :class:`Operation` stage change events.
    
    120
    +    def unregister_client(self, peer):
    
    121
    +        """Unsubscribes to the job's :class:`Operation` stage change.
    
    114 122
     
    
    115 123
             Args:
    
    116
    -            queue (queue.Queue): the event queue to unregister.
    
    124
    +            peer (str): a unique string identifying the client.
    
    117 125
             """
    
    118
    -        self._operation_update_queues.remove(queue)
    
    126
    +        if peer not in self._operation_update_queues:
    
    127
    +            del self._operation_update_queues[peer]
    
    119 128
     
    
    120 129
         def set_cached_result(self, action_result):
    
    121 130
             """Allows specifying an action result form the action cache for the job.
    
    ... ... @@ -211,5 +220,5 @@ class Job:
    211 220
     
    
    212 221
             self._operation.metadata.Pack(self.__operation_metadata)
    
    213 222
     
    
    214
    -        for queue in self._operation_update_queues:
    
    223
    +        for queue in self._operation_update_queues.values():
    
    215 224
                 queue.put(self._operation)

  • buildgrid/server/operations/instance.py
    ... ... @@ -58,18 +58,18 @@ class OperationsInstance:
    58 58
             except KeyError:
    
    59 59
                 raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    60 60
     
    
    61
    -    def register_message_client(self, name, queue):
    
    61
    +    def register_message_client(self, job_name, peer, message_queue):
    
    62 62
             try:
    
    63
    -            self._scheduler.register_client(name, queue)
    
    63
    +            self._scheduler.register_client(job_name, peer, message_queue)
    
    64 64
     
    
    65
    -        except KeyError:
    
    65
    +        except NotFoundError:
    
    66 66
                 raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    67 67
     
    
    68
    -    def unregister_message_client(self, name, queue):
    
    68
    +    def unregister_message_client(self, job_name, peer):
    
    69 69
             try:
    
    70
    -            self._scheduler.unregister_client(name, queue)
    
    70
    +            self._scheduler.unregister_client(job_name, peer)
    
    71 71
     
    
    72
    -        except KeyError:
    
    72
    +        except NotFoundError:
    
    73 73
                 raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    74 74
     
    
    75 75
         def stream_operation_updates(self, message_queue, operation_name):
    

  • buildgrid/server/scheduler.py
    ... ... @@ -23,7 +23,7 @@ from collections import deque
    23 23
     
    
    24 24
     from buildgrid._exceptions import NotFoundError
    
    25 25
     
    
    26
    -from .job import OperationStage, LeaseState
    
    26
    +from .job import Job, OperationStage, LeaseState
    
    27 27
     
    
    28 28
     
    
    29 29
     class Scheduler:
    
    ... ... @@ -32,25 +32,76 @@ class Scheduler:
    32 32
     
    
    33 33
         def __init__(self, action_cache=None):
    
    34 34
             self._action_cache = action_cache
    
    35
    -        self.jobs = {}
    
    35
    +        self.__jobs_by_action = {}
    
    36
    +        self.__jobs_by_name = {}
    
    36 37
             self.queue = deque()
    
    37 38
     
    
    38
    -    def register_client(self, job_name, queue):
    
    39
    -        self.jobs[job_name].register_client(queue)
    
    39
    +    def register_client(self, job_name, peer, message_queue):
    
    40
    +        """Subscribes to one of the job's :class:`Operation` stage changes.
    
    40 41
     
    
    41
    -    def unregister_client(self, job_name, queue):
    
    42
    -        self.jobs[job_name].unregister_client(queue)
    
    42
    +        Args:
    
    43
    +            job_name (str): name of the job to subcribe to.
    
    44
    +            peer (str): a unique string identifying the client.
    
    45
    +            message_queue (queue.Queue): the event queue to register.
    
    46
    +
    
    47
    +        Raises:
    
    48
    +            NotFoundError: If no job with `job_name` exists.
    
    49
    +        """
    
    50
    +        try:
    
    51
    +            self.__jobs_by_name[job_name].register_client(peer, message_queue)
    
    52
    +
    
    53
    +        except KeyError:
    
    54
    +            raise NotFoundError('No job named {} found.'.format(job_name))
    
    55
    +
    
    56
    +    def unregister_client(self, job_name, peer):
    
    57
    +        """Unsubscribes to one of the job's :class:`Operation` stage changes.
    
    58
    +
    
    59
    +        Args:
    
    60
    +            job_name (str): name of the job to unsubcribe from.
    
    61
    +            peer (str): a unique string identifying the client.
    
    62
    +
    
    63
    +        Raises:
    
    64
    +            NotFoundError: If no job with `job_name` exists.
    
    65
    +        """
    
    66
    +        try:
    
    67
    +            self.__jobs_by_name[job_name].unregister_client(peer)
    
    68
    +
    
    69
    +        except KeyError:
    
    70
    +            raise NotFoundError('No job named {} found.'.format(job_name))
    
    43 71
     
    
    44
    -        if not self.jobs[job_name].n_clients and self.jobs[job_name].operation.done:
    
    45
    -            del self.jobs[job_name]
    
    72
    +        if (self.__jobs_by_name[job_name].n_clients == 0 and
    
    73
    +            self.__jobs_by_name[job_name].operation.done):
    
    74
    +            del self.__jobs_by_name[job_name]
    
    46 75
     
    
    47
    -    def queue_job(self, job, skip_cache_lookup=False):
    
    48
    -        self.jobs[job.name] = job
    
    76
    +    def queue_job(self, action, action_digest, priority=0, skip_cache_lookup=False):
    
    77
    +        """Inserts a newly created job into the execution queue.
    
    78
    +
    
    79
    +        Args:
    
    80
    +            action (Action): the given action to queue for execution.
    
    81
    +            action_digest (Digest): the digest of the given action.
    
    82
    +            priority (int): the execution job's priority.
    
    83
    +            skip_cache_lookup (bool): whether or not to look for pre-computed
    
    84
    +                result for the given action.
    
    85
    +        """
    
    86
    +        if action_digest.hash in self.__jobs_by_action:
    
    87
    +            job = self.__jobs_by_action[action_digest.hash]
    
    88
    +
    
    89
    +            if priority < job.priority:
    
    90
    +                #TODO: We need to requeue here
    
    91
    +                job.priority = priority
    
    92
    +
    
    93
    +            return job
    
    94
    +
    
    95
    +        job = Job(action, action_digest, priority=priority)
    
    96
    +
    
    97
    +        self.__jobs_by_action[job.action_digest.hash] = job
    
    98
    +        self.__jobs_by_name[job.name] = job
    
    49 99
     
    
    50 100
             operation_stage = None
    
    51 101
             if self._action_cache is not None and not skip_cache_lookup:
    
    52 102
                 try:
    
    53 103
                     action_result = self._action_cache.get_action_result(job.action_digest)
    
    104
    +
    
    54 105
                 except NotFoundError:
    
    55 106
                     operation_stage = OperationStage.QUEUED
    
    56 107
                     self.queue.append(job)
    
    ... ... @@ -65,9 +116,11 @@ class Scheduler:
    65 116
     
    
    66 117
             job.update_operation_stage(operation_stage)
    
    67 118
     
    
    119
    +        return job
    
    120
    +
    
    68 121
         def retry_job(self, job_name):
    
    69
    -        if job_name in self.jobs:
    
    70
    -            job = self.jobs[job_name]
    
    122
    +        if job_name in self.__jobs_by_name:
    
    123
    +            job = self.__jobs_by_name[job_name]
    
    71 124
                 if job.n_tries >= self.MAX_N_TRIES:
    
    72 125
                     # TODO: Decide what to do with these jobs
    
    73 126
                     job.update_operation_stage(OperationStage.COMPLETED)
    
    ... ... @@ -77,7 +130,7 @@ class Scheduler:
    77 130
                     self.queue.appendleft(job)
    
    78 131
     
    
    79 132
         def list_jobs(self):
    
    80
    -        return self.jobs.values()
    
    133
    +        return self.__jobs_by_name.values()
    
    81 134
     
    
    82 135
         def request_job_leases(self, worker_capabilities):
    
    83 136
             """Generates a list of the highest priority leases to be run.
    
    ... ... @@ -107,7 +160,7 @@ class Scheduler:
    107 160
                 lease_result (google.protobuf.Any): the lease execution result, only
    
    108 161
                     required if `lease_state` is `COMPLETED`.
    
    109 162
             """
    
    110
    -        job = self.jobs[job_name]
    
    163
    +        job = self.__jobs_by_name[job_name]
    
    111 164
     
    
    112 165
             if lease_state == LeaseState.PENDING:
    
    113 166
                 job.update_lease_state(LeaseState.PENDING)
    
    ... ... @@ -128,8 +181,8 @@ class Scheduler:
    128 181
     
    
    129 182
         def get_job_lease(self, job_name):
    
    130 183
             """Returns the lease associated to job, if any have been emitted yet."""
    
    131
    -        return self.jobs[job_name].lease
    
    184
    +        return self.__jobs_by_name[job_name].lease
    
    132 185
     
    
    133 186
         def get_job_operation(self, job_name):
    
    134 187
             """Returns the operation associated to job."""
    
    135
    -        return self.jobs[job_name].operation
    188
    +        return self.__jobs_by_name[job_name].operation

  • setup.py
    ... ... @@ -86,7 +86,7 @@ def get_cmdclass():
    86 86
         return cmdclass
    
    87 87
     
    
    88 88
     tests_require = [
    
    89
    -    'coverage == 4.4.0',
    
    89
    +    'coverage >= 4.5.0',
    
    90 90
         'moto',
    
    91 91
         'pep8',
    
    92 92
         'psutil',
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]