[Notes] [Git][BuildGrid/buildgrid][finn/async] Adding messaging service



Title: GitLab

finnball pushed to branch finn/async at BuildGrid / buildgrid

Commits:

4 changed files:

Changes:

  • buildgrid/server/execution/execution_instance.py
    ... ... @@ -34,12 +34,12 @@ class ExecutionInstance():
    34 34
             self.logger = logging.getLogger(__name__)
    
    35 35
             self._scheduler = scheduler
    
    36 36
     
    
    37
    -    def execute(self, action_digest, skip_cache_lookup):
    
    37
    +    def execute(self, action_digest, skip_cache_lookup, message_queue=None):
    
    38 38
             """ Sends a job for execution.
    
    39 39
             Queues an action and creates an Operation instance to be associated with
    
    40 40
             this action.
    
    41 41
             """
    
    42
    -        job = Job(action_digest)
    
    42
    +        job = Job(action_digest, message_queue)
    
    43 43
             self.logger.info("Operation name: {}".format(job.name))
    
    44 44
     
    
    45 45
             if not skip_cache_lookup:
    
    ... ... @@ -70,3 +70,15 @@ class ExecutionInstance():
    70 70
         def cancel_operation(self, name):
    
    71 71
             # TODO: Cancel leases
    
    72 72
             raise NotImplementedError("Cancelled operations not supported")
    
    73
    +
    
    74
    +    def register_message_client(self, name, queue):
    
    75
    +        try:
    
    76
    +            self._scheduler.register_client(name, queue)
    
    77
    +        except KeyError:
    
    78
    +            raise InvalidArgumentError("Operation name does not exist: {}".format(name))
    
    79
    +
    
    80
    +    def unregister_message_client(self, name, queue):
    
    81
    +        try:
    
    82
    +            self._scheduler.unregister_client(name, queue)
    
    83
    +        except KeyError:
    
    84
    +            raise InvalidArgumentError("Operation name does not exist: {}".format(name))

  • buildgrid/server/execution/execution_service.py
    ... ... @@ -25,6 +25,7 @@ Serves remote execution requests.
    25 25
     import copy
    
    26 26
     import grpc
    
    27 27
     import logging
    
    28
    +import queue
    
    28 29
     import time
    
    29 30
     
    
    30 31
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    ... ... @@ -35,21 +36,27 @@ from ._exceptions import InvalidArgumentError
    35 36
     class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    
    36 37
     
    
    37 38
         def __init__(self, instance):
    
    38
    -        self._instance = instance
    
    39 39
             self.logger = logging.getLogger(__name__)
    
    40
    +        self._instance = instance
    
    40 41
     
    
    41 42
         def Execute(self, request, context):
    
    42 43
             # Ignore request.instance_name for now
    
    43 44
             # Have only one instance
    
    44 45
             try:
    
    46
    +            message_queue = queue.Queue()
    
    45 47
                 operation = self._instance.execute(request.action_digest,
    
    46
    -                                               request.skip_cache_lookup)
    
    48
    +                                               request.skip_cache_lookup,
    
    49
    +                                               message_queue)
    
    47 50
     
    
    48
    -            yield from self._stream_operation_updates(operation.name)
    
    51
    +            remove_client = lambda : self._remove_client(operation.name, message_queue)
    
    52
    +            context.add_callback(remove_client)
    
    53
    +
    
    54
    +            yield from self._stream_operation_updates(message_queue,
    
    55
    +                                                      operation.name)
    
    49 56
     
    
    50 57
             except InvalidArgumentError as e:
    
    51 58
                 self.logger.error(e)
    
    52
    -            context.set_details(str(e))
    
    59
    +            context.set_details(sxtr(e))
    
    53 60
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    54 61
     
    
    55 62
             except NotImplementedError as e:
    
    ... ... @@ -59,19 +66,28 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    59 66
     
    
    60 67
         def WaitExecution(self, request, context):
    
    61 68
             try:
    
    62
    -            yield from self._stream_operation_updates(request.name)
    
    69
    +            message_queue = queue.Queue()
    
    70
    +            operation_name = request.name
    
    71
    +
    
    72
    +            self._instance.register_message_client(operation_name, message_queue)
    
    73
    +
    
    74
    +            remove_client = lambda : self._remove_client(operation_name, message_queue)
    
    75
    +            context.add_callback(remove_client)
    
    76
    +
    
    77
    +            yield from self._stream_operation_updates(message_queue,
    
    78
    +                                                      operation_name)
    
    63 79
     
    
    64 80
             except InvalidArgumentError as e:
    
    65 81
                 self.logger.error(e)
    
    66 82
                 context.set_details(str(e))
    
    67 83
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    68 84
     
    
    69
    -    def _stream_operation_updates(self, name):
    
    70
    -        stream_previous = None
    
    71
    -        while True:
    
    72
    -            stream = self._instance.get_operation(name)
    
    73
    -            if stream != stream_previous:
    
    74
    -                yield stream
    
    75
    -                if stream.done == True: break
    
    76
    -                stream_previous = copy.deepcopy(stream)
    
    77
    -            time.sleep(1)
    85
    +    def _remove_client(self, operation_name, message_queue):
    
    86
    +        self._instance.unregister_message_client(operation_name, message_queue)
    
    87
    +
    
    88
    +    def _stream_operation_updates(self, message_queue, operation_name):
    
    89
    +        operation = message_queue.get()
    
    90
    +        while not operation.done:
    
    91
    +            yield operation
    
    92
    +            operation = message_queue.get()
    
    93
    +        yield operation

  • buildgrid/server/job.py
    ... ... @@ -51,9 +51,8 @@ class LeaseState(Enum):
    51 51
     
    
    52 52
     class Job():
    
    53 53
     
    
    54
    -    def __init__(self, action):
    
    55
    -        self.action = action
    
    56
    -        self.bot_status = BotStatus.BOT_STATUS_UNSPECIFIED
    
    54
    +    def __init__(self, action_digest, message_queue=None):
    
    55
    +        self.action_digest = action_digest
    
    57 56
             self.execute_stage = ExecuteStage.UNKNOWN
    
    58 57
             self.lease = None
    
    59 58
             self.logger = logging.getLogger(__name__)
    
    ... ... @@ -62,10 +61,19 @@ class Job():
    62 61
     
    
    63 62
             self._n_tries = 0
    
    64 63
             self._operation = operations_pb2.Operation(name = self.name)
    
    64
    +        self._operation_update_queues = []
    
    65
    +
    
    66
    +        if message_queue is not None:
    
    67
    +            self.register_client(message_queue)
    
    68
    +
    
    69
    +    def register_client(self, queue):
    
    70
    +        self._operation_update_queues.append(queue)
    
    71
    +
    
    72
    +    def unregister_client(self, queue):
    
    73
    +        self._operation_update_queues.remove(queue)
    
    65 74
     
    
    66 75
         def get_operation(self):
    
    67 76
             self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
    
    68
    -
    
    69 77
             if self.result is not None:
    
    70 78
                 self._operation.done = True
    
    71 79
                 response = ExecuteResponse()
    
    ... ... @@ -81,10 +89,10 @@ class Job():
    81 89
             return meta
    
    82 90
     
    
    83 91
         def create_lease(self):
    
    84
    -        action = self._pack_any(self.action)
    
    92
    +        action_digest = self._pack_any(self.action_digest)
    
    85 93
     
    
    86 94
             lease = bots_pb2.Lease(id = self.name,
    
    87
    -                               payload = action,
    
    95
    +                               payload = action_digest,
    
    88 96
                                    state = LeaseState.PENDING.value)
    
    89 97
             self.lease = lease
    
    90 98
             return lease
    
    ... ... @@ -92,6 +100,11 @@ class Job():
    92 100
         def get_operations(self):
    
    93 101
             return operations_pb2.ListOperationsResponse(operations = [self.get_operation()])
    
    94 102
     
    
    103
    +    def update_execute_stage(self, stage):
    
    104
    +        self.execute_stage = stage
    
    105
    +        for queue in self._operation_update_queues:
    
    106
    +            queue.put(self.get_operation())
    
    107
    +
    
    95 108
         def _pack_any(self, pack):
    
    96 109
             any = any_pb2.Any()
    
    97 110
             any.Pack(pack)
    

  • buildgrid/server/scheduler.py
    ... ... @@ -35,8 +35,14 @@ class Scheduler():
    35 35
             self.jobs = {}
    
    36 36
             self.queue = deque()
    
    37 37
     
    
    38
    +    def register_client(self, name, queue):
    
    39
    +        self.jobs[name].register_client(queue)
    
    40
    +
    
    41
    +    def unregister_client(self, name, queue):
    
    42
    +        self.jobs[name].unregister_client(queue)
    
    43
    +
    
    38 44
         def append_job(self, job):
    
    39
    -        job.execute_stage = ExecuteStage.QUEUED
    
    45
    +        job.update_execute_stage(ExecuteStage.QUEUED)
    
    40 46
             self.jobs[job.name] = job
    
    41 47
             self.queue.append(job)
    
    42 48
     
    
    ... ... @@ -45,9 +51,9 @@ class Scheduler():
    45 51
     
    
    46 52
             if job.n_tries >= self.MAX_N_TRIES:
    
    47 53
                 # TODO: Decide what to do with these jobs
    
    48
    -            job.execute_stage = ExecuteStage.COMPLETED
    
    54
    +            job.update_execute_stage(ExecuteStage.COMPLETED)
    
    49 55
             else:
    
    50
    -            job.execute_stage = ExecuteStage.QUEUED
    
    56
    +            job.update_execute_stage(ExecuteStage.QUEUED)
    
    51 57
                 job.n_tries += 1
    
    52 58
                 self.queue.appendleft(job)
    
    53 59
     
    
    ... ... @@ -56,15 +62,14 @@ class Scheduler():
    56 62
         def create_job(self):
    
    57 63
             if len(self.queue) > 0:
    
    58 64
                 job = self.queue.popleft()
    
    59
    -            job.execute_stage = ExecuteStage.EXECUTING
    
    65
    +            job.update_execute_stage(ExecuteStage.EXECUTING)
    
    60 66
                 self.jobs[job.name] = job
    
    61 67
                 return job
    
    62
    -        return None
    
    63 68
     
    
    64 69
         def job_complete(self, name, result):
    
    65 70
             job = self.jobs[name]
    
    66
    -        job.execute_stage = ExecuteStage.COMPLETED
    
    67 71
             job.result = result
    
    72
    +        job.update_execute_stage(ExecuteStage.COMPLETED)
    
    68 73
             self.jobs[name] = job
    
    69 74
     
    
    70 75
         def get_operations(self):
    
    ... ... @@ -122,3 +127,7 @@ class Scheduler():
    122 127
             if state == LeaseState.PENDING.value or \
    
    123 128
                state == LeaseState.ACTIVE.value:
    
    124 129
                 self.retry_job(name)
    
    130
    +
    
    131
    +    def _update_execute_stage(self, job, stage):
    
    132
    +        job.update_execute_stage(stage)
    
    133
    +        return job



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]