[Notes] [Git][BuildGrid/buildgrid][finn/async] Adding messaging service



Title: GitLab

finnball pushed to branch finn/async at BuildGrid / buildgrid

Commits:

4 changed files:

Changes:

  • buildgrid/server/execution/execution_instance.py
    ... ... @@ -34,12 +34,12 @@ class ExecutionInstance():
    34 34
             self.logger = logging.getLogger(__name__)
    
    35 35
             self._scheduler = scheduler
    
    36 36
     
    
    37
    -    def execute(self, action_digest, skip_cache_lookup):
    
    37
    +    def execute(self, action_digest, skip_cache_lookup, message_queue=None):
    
    38 38
             """ Sends a job for execution.
    
    39 39
             Queues an action and creates an Operation instance to be associated with
    
    40 40
             this action.
    
    41 41
             """
    
    42
    -        job = Job(action_digest)
    
    42
    +        job = Job(action_digest, message_queue)
    
    43 43
             self.logger.info("Operation name: {}".format(job.name))
    
    44 44
     
    
    45 45
             if not skip_cache_lookup:
    
    ... ... @@ -70,3 +70,9 @@ class ExecutionInstance():
    70 70
         def cancel_operation(self, name):
    
    71 71
             # TODO: Cancel leases
    
    72 72
             raise NotImplementedError("Cancelled operations not supported")
    
    73
    +
    
    74
    +    def register_message_client(self, name, queue):
    
    75
    +        try:
    
    76
    +            self._scheduler.register_client(name, queue)
    
    77
    +        except KeyError:
    
    78
    +            raise InvalidArgumentError("Operation name does not exist: {}".format(name))

  • buildgrid/server/execution/execution_service.py
    ... ... @@ -25,6 +25,7 @@ Serves remote execution requests.
    25 25
     import copy
    
    26 26
     import grpc
    
    27 27
     import logging
    
    28
    +import queue
    
    28 29
     import time
    
    29 30
     
    
    30 31
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    ... ... @@ -42,10 +43,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    42 43
             # Ignore request.instance_name for now
    
    43 44
             # Have only one instance
    
    44 45
             try:
    
    45
    -            operation = self._instance.execute(request.action_digest,
    
    46
    -                                               request.skip_cache_lookup)
    
    47
    -
    
    48
    -            yield from self._stream_operation_updates(operation.name)
    
    46
    +            message_queue = queue.Queue()
    
    47
    +            self._instance.execute(request.action_digest,
    
    48
    +                                   request.skip_cache_lookup,
    
    49
    +                                   message_queue)
    
    50
    +            yield from self._stream_operation_updates(message_queue)
    
    49 51
     
    
    50 52
             except InvalidArgumentError as e:
    
    51 53
                 self.logger.error(e)
    
    ... ... @@ -59,19 +61,18 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    59 61
     
    
    60 62
         def WaitExecution(self, request, context):
    
    61 63
             try:
    
    62
    -            yield from self._stream_operation_updates(request.name)
    
    64
    +            message_queue = queue.Queue()
    
    65
    +            self._instance.register_message_client(request.name, message_queue)
    
    66
    +            yield from self._stream_operation_updates(message_queue)
    
    63 67
     
    
    64 68
             except InvalidArgumentError as e:
    
    65 69
                 self.logger.error(e)
    
    66 70
                 context.set_details(str(e))
    
    67 71
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    68 72
     
    
    69
    -    def _stream_operation_updates(self, name):
    
    70
    -        stream_previous = None
    
    71
    -        while True:
    
    72
    -            stream = self._instance.get_operation(name)
    
    73
    -            if stream != stream_previous:
    
    74
    -                yield stream
    
    75
    -                if stream.done == True: break
    
    76
    -                stream_previous = copy.deepcopy(stream)
    
    77
    -            time.sleep(1)
    73
    +    def _stream_operation_updates(self, queue):
    
    74
    +        operation = queue.get()
    
    75
    +        while not operation.done:
    
    76
    +            yield operation
    
    77
    +            operation = queue.get()
    
    78
    +        yield operation

  • buildgrid/server/job.py
    ... ... @@ -15,6 +15,7 @@
    15 15
     # Authors:
    
    16 16
     #        Finn Ball <finn ball codethink co uk>
    
    17 17
     
    
    18
    +import asyncio
    
    18 19
     import logging
    
    19 20
     import uuid
    
    20 21
     
    
    ... ... @@ -51,9 +52,8 @@ class LeaseState(Enum):
    51 52
     
    
    52 53
     class Job():
    
    53 54
     
    
    54
    -    def __init__(self, action):
    
    55
    -        self.action = action
    
    56
    -        self.bot_status = BotStatus.BOT_STATUS_UNSPECIFIED
    
    55
    +    def __init__(self, action_digest, message_queue=None):
    
    56
    +        self.action_digest = action_digest
    
    57 57
             self.execute_stage = ExecuteStage.UNKNOWN
    
    58 58
             self.lease = None
    
    59 59
             self.logger = logging.getLogger(__name__)
    
    ... ... @@ -62,10 +62,16 @@ class Job():
    62 62
     
    
    63 63
             self._n_tries = 0
    
    64 64
             self._operation = operations_pb2.Operation(name = self.name)
    
    65
    +        self._operation_update_queues = []
    
    66
    +
    
    67
    +        if message_queue is not None:
    
    68
    +            self.register_client(message_queue)
    
    69
    +
    
    70
    +    def register_client(self, queue):
    
    71
    +        self._operation_update_queues.append(queue)
    
    65 72
     
    
    66 73
         def get_operation(self):
    
    67 74
             self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
    
    68
    -
    
    69 75
             if self.result is not None:
    
    70 76
                 self._operation.done = True
    
    71 77
                 response = ExecuteResponse()
    
    ... ... @@ -81,10 +87,10 @@ class Job():
    81 87
             return meta
    
    82 88
     
    
    83 89
         def create_lease(self):
    
    84
    -        action = self._pack_any(self.action)
    
    90
    +        action_digest = self._pack_any(self.action_digest)
    
    85 91
     
    
    86 92
             lease = bots_pb2.Lease(id = self.name,
    
    87
    -                               payload = action,
    
    93
    +                               payload = action_digest,
    
    88 94
                                    state = LeaseState.PENDING.value)
    
    89 95
             self.lease = lease
    
    90 96
             return lease
    
    ... ... @@ -92,6 +98,11 @@ class Job():
    92 98
         def get_operations(self):
    
    93 99
             return operations_pb2.ListOperationsResponse(operations = [self.get_operation()])
    
    94 100
     
    
    101
    +    def update_execute_stage(self, stage):
    
    102
    +        self.execute_stage = stage
    
    103
    +        for queue in self._operation_update_queues:
    
    104
    +            queue.put(self.get_operation())
    
    105
    +
    
    95 106
         def _pack_any(self, pack):
    
    96 107
             any = any_pb2.Any()
    
    97 108
             any.Pack(pack)
    

  • buildgrid/server/scheduler.py
    ... ... @@ -35,8 +35,11 @@ class Scheduler():
    35 35
             self.jobs = {}
    
    36 36
             self.queue = deque()
    
    37 37
     
    
    38
    +    def register_client(self, name, queue):
    
    39
    +        self.jobs[name].register_client(queue)
    
    40
    +
    
    38 41
         def append_job(self, job):
    
    39
    -        job.execute_stage = ExecuteStage.QUEUED
    
    42
    +        job.update_execute_stage(ExecuteStage.QUEUED)
    
    40 43
             self.jobs[job.name] = job
    
    41 44
             self.queue.append(job)
    
    42 45
     
    
    ... ... @@ -45,9 +48,9 @@ class Scheduler():
    45 48
     
    
    46 49
             if job.n_tries >= self.MAX_N_TRIES:
    
    47 50
                 # TODO: Decide what to do with these jobs
    
    48
    -            job.execute_stage = ExecuteStage.COMPLETED
    
    51
    +            job.update_execute_stage(ExecuteStage.COMPLETED)
    
    49 52
             else:
    
    50
    -            job.execute_stage = ExecuteStage.QUEUED
    
    53
    +            job.update_execute_stage(ExecuteStage.QUEUED)
    
    51 54
                 job.n_tries += 1
    
    52 55
                 self.queue.appendleft(job)
    
    53 56
     
    
    ... ... @@ -56,15 +59,14 @@ class Scheduler():
    56 59
         def create_job(self):
    
    57 60
             if len(self.queue) > 0:
    
    58 61
                 job = self.queue.popleft()
    
    59
    -            job.execute_stage = ExecuteStage.EXECUTING
    
    62
    +            job.update_execute_stage(ExecuteStage.EXECUTING)
    
    60 63
                 self.jobs[job.name] = job
    
    61 64
                 return job
    
    62
    -        return None
    
    63 65
     
    
    64 66
         def job_complete(self, name, result):
    
    65 67
             job = self.jobs[name]
    
    66
    -        job.execute_stage = ExecuteStage.COMPLETED
    
    67 68
             job.result = result
    
    69
    +        job.update_execute_stage(ExecuteStage.COMPLETED)
    
    68 70
             self.jobs[name] = job
    
    69 71
     
    
    70 72
         def get_operations(self):
    
    ... ... @@ -122,3 +124,7 @@ class Scheduler():
    122 124
             if state == LeaseState.PENDING.value or \
    
    123 125
                state == LeaseState.ACTIVE.value:
    
    124 126
                 self.retry_job(name)
    
    127
    +
    
    128
    +    def _update_execute_stage(self, job, stage):
    
    129
    +        job.update_execute_stage(stage)
    
    130
    +        return job



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]