[Notes] [Git][BuildGrid/buildgrid][mablanch/75-requests-multiplexing] Keep track of peers requesting job execution



Title: GitLab

Martin Blanchard pushed to branch mablanch/75-requests-multiplexing at BuildGrid / buildgrid

Commits:

6 changed files:

Changes:

  • buildgrid/server/execution/instance.py
    ... ... @@ -21,7 +21,7 @@ An instance of the Remote Execution Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    -from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
    
    24
    +from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
    
    25 25
     from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
    
    26 26
     
    
    27 27
     
    
    ... ... @@ -35,7 +35,7 @@ class ExecutionInstance:
    35 35
         def register_instance_with_server(self, instance_name, server):
    
    36 36
             server.add_execution_instance(self, instance_name)
    
    37 37
     
    
    38
    -    def execute(self, action_digest, skip_cache_lookup, message_queue=None):
    
    38
    +    def execute(self, action_digest, skip_cache_lookup, peer=None, message_queue=None):
    
    39 39
             """ Sends a job for execution.
    
    40 40
             Queues an action and creates an Operation instance to be associated with
    
    41 41
             this action.
    
    ... ... @@ -48,24 +48,24 @@ class ExecutionInstance:
    48 48
     
    
    49 49
             job = self._scheduler.queue_job(action, action_digest, skip_cache_lookup)
    
    50 50
     
    
    51
    -        if message_queue is not None:
    
    52
    -            job.register_client(message_queue)
    
    51
    +        if peer is not None and message_queue is not None:
    
    52
    +            job.register_client(peer, message_queue)
    
    53 53
     
    
    54 54
             return job.operation
    
    55 55
     
    
    56
    -    def register_message_client(self, name, queue):
    
    56
    +    def register_message_client(self, job_name, peer, message_queue):
    
    57 57
             try:
    
    58
    -            self._scheduler.register_client(name, queue)
    
    58
    +            self._scheduler.register_client(job_name, peer, message_queue)
    
    59 59
     
    
    60
    -        except KeyError:
    
    61
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    60
    +        except NotFoundError:
    
    61
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    62 62
     
    
    63
    -    def unregister_message_client(self, name, queue):
    
    63
    +    def unregister_message_client(self, job_name, peer):
    
    64 64
             try:
    
    65
    -            self._scheduler.unregister_client(name, queue)
    
    65
    +            self._scheduler.unregister_client(job_name, peer)
    
    66 66
     
    
    67
    -        except KeyError:
    
    68
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    67
    +        except NotFoundError:
    
    68
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    69 69
     
    
    70 70
         def stream_operation_updates(self, message_queue, operation_name):
    
    71 71
             operation = message_queue.get()
    

  • buildgrid/server/execution/service.py
    ... ... @@ -47,10 +47,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    47 47
                 instance = self._get_instance(request.instance_name)
    
    48 48
                 operation = instance.execute(request.action_digest,
    
    49 49
                                              request.skip_cache_lookup,
    
    50
    -                                         message_queue)
    
    50
    +                                         peer=context.peer(),
    
    51
    +                                         message_queue=message_queue)
    
    51 52
     
    
    52 53
                 context.add_callback(partial(instance.unregister_message_client,
    
    53
    -                                         operation.name, message_queue))
    
    54
    +                                         operation.name, context.peer()))
    
    54 55
     
    
    55 56
                 instanced_op_name = "{}/{}".format(request.instance_name,
    
    56 57
                                                    operation.name)
    
    ... ... @@ -88,10 +89,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    88 89
                 operation_name = names[-1]
    
    89 90
                 instance = self._get_instance(instance_name)
    
    90 91
     
    
    91
    -            instance.register_message_client(operation_name, message_queue)
    
    92
    +            instance.register_message_client(operation_name,
    
    93
    +                                             context.peer(), message_queue)
    
    92 94
     
    
    93 95
                 context.add_callback(partial(instance.unregister_message_client,
    
    94
    -                                         operation_name, message_queue))
    
    96
    +                                         operation_name, context.peer()))
    
    95 97
     
    
    96 98
                 for operation in instance.stream_operation_updates(message_queue,
    
    97 99
                                                                    operation_name):
    

  • buildgrid/server/job.py
    ... ... @@ -46,7 +46,7 @@ class Job:
    46 46
     
    
    47 47
             self._action.CopyFrom(action)
    
    48 48
             self._do_not_cache = self._action.do_not_cache
    
    49
    -        self._operation_update_queues = []
    
    49
    +        self._operation_update_queues = {}
    
    50 50
             self._operation.name = self._name
    
    51 51
             self._operation.done = False
    
    52 52
             self._n_tries = 0
    
    ... ... @@ -113,22 +113,26 @@ class Job:
    113 113
         def __ne__(self, other):
    
    114 114
             return not self.__eq__(other)
    
    115 115
     
    
    116
    -    def register_client(self, queue):
    
    117
    -        """Subscribes to the job's :class:`Operation` stage change events.
    
    116
    +    def register_client(self, peer, message_queue):
    
    117
    +        """Subscribes to the job's :class:`Operation` stage changes.
    
    118 118
     
    
    119 119
             Args:
    
    120
    -            queue (queue.Queue): the event queue to register.
    
    120
    +            peer (str): a unique string identifying the client.
    
    121
    +            message_queue (queue.Queue): the event queue to register.
    
    121 122
             """
    
    122
    -        self._operation_update_queues.append(queue)
    
    123
    -        queue.put(self._operation)
    
    123
    +        if peer not in self._operation_update_queues:
    
    124
    +            self._operation_update_queues[peer] = message_queue
    
    124 125
     
    
    125
    -    def unregister_client(self, queue):
    
    126
    -        """Unsubscribes to the job's :class:`Operation` stage change events.
    
    126
    +        message_queue.put(self._operation)
    
    127
    +
    
    128
    +    def unregister_client(self, peer):
    
    129
    +        """Unsubscribes to the job's :class:`Operation` stage change.
    
    127 130
     
    
    128 131
             Args:
    
    129
    -            queue (queue.Queue): the event queue to unregister.
    
    132
    +            peer (str): a unique string identifying the client.
    
    130 133
             """
    
    131
    -        self._operation_update_queues.remove(queue)
    
    134
    +        if peer not in self._operation_update_queues:
    
    135
    +            del self._operation_update_queues[peer]
    
    132 136
     
    
    133 137
         def set_cached_result(self, action_result):
    
    134 138
             """Allows specifying an action result form the action cache for the job.
    
    ... ... @@ -224,5 +228,5 @@ class Job:
    224 228
     
    
    225 229
             self._operation.metadata.Pack(self.__operation_metadata)
    
    226 230
     
    
    227
    -        for queue in self._operation_update_queues:
    
    231
    +        for queue in self._operation_update_queues.values():
    
    228 232
                 queue.put(self._operation)

  • buildgrid/server/operations/instance.py
    ... ... @@ -57,13 +57,12 @@ class OperationsInstance:
    57 57
     
    
    58 58
             return response
    
    59 59
     
    
    60
    -    def delete_operation(self, name):
    
    60
    +    def delete_operation(self, job_name, peer):
    
    61 61
             try:
    
    62
    -            # TODO: Unregister the caller client
    
    63
    -            pass
    
    62
    +            self._scheduler.unregister_client(job_name, peer)
    
    64 63
     
    
    65 64
             except NotFoundError:
    
    66
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    65
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    67 66
     
    
    68 67
         def register_message_client(self, job_name, peer, message_queue):
    
    69 68
             try:
    

  • buildgrid/server/operations/service.py
    ... ... @@ -93,7 +93,7 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    93 93
                 instance = self._get_instance(instance_name)
    
    94 94
     
    
    95 95
                 operation_name = self._parse_operation_name(name)
    
    96
    -            instance.delete_operation(operation_name)
    
    96
    +            instance.delete_operation(operation_name, context.peer())
    
    97 97
     
    
    98 98
             except InvalidArgumentError as e:
    
    99 99
                 self.logger.error(e)
    

  • buildgrid/server/scheduler.py
    ... ... @@ -37,12 +37,40 @@ class Scheduler:
    37 37
             self.__jobs_by_name = {}
    
    38 38
             self.__queue = deque()
    
    39 39
     
    
    40
    -    def register_client(self, job_name, queue):
    
    41
    -        self.__jobs_by_name[job_name].register_client(queue)
    
    40
    +    def register_client(self, job_name, peer, message_queue):
    
    41
    +        """Subscribes to one of the job's :class:`Operation` stage changes.
    
    42 42
     
    
    43
    -    def unregister_client(self, job_name, queue):
    
    44
    -        job = self.__jobs_by_name[job_name]
    
    45
    -        job.unregister_client(queue)
    
    43
    +        Args:
    
    44
    +            job_name (str): name of the job subscribe to.
    
    45
    +            peer (str): a unique string identifying the client.
    
    46
    +            message_queue (queue.Queue): the event queue to register.
    
    47
    +
    
    48
    +        Raises:
    
    49
    +            NotFoundError: If no job with `job_name` exists.
    
    50
    +        """
    
    51
    +        try:
    
    52
    +            job = self.__jobs_by_name[job_name]
    
    53
    +        except KeyError:
    
    54
    +            raise NotFoundError('No job named {} found.'.format(job_name))
    
    55
    +
    
    56
    +        job.register_client(peer, message_queue)
    
    57
    +
    
    58
    +    def unregister_client(self, job_name, peer):
    
    59
    +        """Unsubscribes to one of the job's :class:`Operation` stage change.
    
    60
    +
    
    61
    +        Args:
    
    62
    +            job_name (str): name of the job to unsubscribe from.
    
    63
    +            peer (str): a unique string identifying the client.
    
    64
    +
    
    65
    +        Raises:
    
    66
    +            NotFoundError: If no job with `job_name` exists.
    
    67
    +        """
    
    68
    +        try:
    
    69
    +            job = self.__jobs_by_name[job_name]
    
    70
    +        except KeyError:
    
    71
    +            raise NotFoundError('No job named {} found.'.format(job_name))
    
    72
    +
    
    73
    +        job.unregister_client(peer)
    
    46 74
     
    
    47 75
             if job.n_clients == 0 and job.operation.done:
    
    48 76
                 del self.__jobs_by_action[job.action_digest]
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]