[Notes] [Git][BuildGrid/buildgrid][finn/bot-refactor] 2 commits: Changes to lease creation. Edge detection state machine.



Title: GitLab

finnball pushed to branch finn/bot-refactor at BuildGrid / buildgrid

Commits:

5 changed files:

Changes:

  • buildgrid/server/scheduler.py
    ... ... @@ -50,24 +50,17 @@ class Scheduler():
    50 50
             self.queue.append(job)
    
    51 51
     
    
    52 52
         def retry_job(self, name):
    
    53
    -        job = self.jobs[name]
    
    54
    -
    
    55
    -        if job.n_tries >= self.MAX_N_TRIES:
    
    56
    -            # TODO: Decide what to do with these jobs
    
    57
    -            job.update_execute_stage(ExecuteStage.COMPLETED)
    
    58
    -        else:
    
    59
    -            job.update_execute_stage(ExecuteStage.QUEUED)
    
    60
    -            job.n_tries += 1
    
    61
    -            self.queue.appendleft(job)
    
    62
    -
    
    63
    -        self.jobs[name] = job
    
    53
    +        if job in self.jobs[name]:
    
    54
    +            if job.n_tries >= self.MAX_N_TRIES:
    
    55
    +                # TODO: Decide what to do with these jobs
    
    56
    +                job.update_execute_stage(ExecuteStage.COMPLETED)
    
    57
    +                # TODO: Mark these jobs as done
    
    58
    +            else:
    
    59
    +                job.update_execute_stage(ExecuteStage.QUEUED)
    
    60
    +                job.n_tries += 1
    
    61
    +                self.queue.appendleft(job)
    
    64 62
     
    
    65
    -    def create_job(self):
    
    66
    -        if len(self.queue) > 0:
    
    67
    -            job = self.queue.popleft()
    
    68
    -            job.update_execute_stage(ExecuteStage.EXECUTING)
    
    69
    -            self.jobs[job.name] = job
    
    70
    -            return job
    
    63
    +            self.jobs[name] = job
    
    71 64
     
    
    72 65
         def job_complete(self, name, result):
    
    73 66
             job = self.jobs[name]
    
    ... ... @@ -81,48 +74,13 @@ class Scheduler():
    81 74
                 response.operations.extend([v.get_operation()])
    
    82 75
             return response
    
    83 76
     
    
    84
    -    def update_lease(self, lease):
    
    85
    -        name = lease.id
    
    77
    +    def update_job_lease_state(self, name, state):
    
    86 78
             job = self.jobs.get(name)
    
    87
    -        state = lease.state
    
    88
    -
    
    89
    -        if state   == LeaseState.LEASE_STATE_UNSPECIFIED.value:
    
    90
    -            create_job = self.create_job()
    
    91
    -            if create_job is None:
    
    92
    -                # No job? Return lease.
    
    93
    -                return lease
    
    94
    -            else:
    
    95
    -                job = create_job
    
    96
    -                job.create_lease()
    
    97
    -
    
    98
    -        elif state == LeaseState.PENDING.value:
    
    99
    -            job.lease = lease
    
    100
    -
    
    101
    -        elif state == LeaseState.ACTIVE.value:
    
    102
    -            job.lease = lease
    
    103
    -
    
    104
    -        elif state == LeaseState.COMPLETED.value:
    
    105
    -            self.job_complete(job.name, lease.result)
    
    106
    -
    
    107
    -            create_job = self.create_job()
    
    108
    -            if create_job is None:
    
    109
    -                # Docs say not to use this state though if job has
    
    110
    -                # completed and no more jobs, then use this state to stop
    
    111
    -                # job being processed again
    
    112
    -                job.lease = lease
    
    113
    -                job.lease.state = LeaseState.LEASE_STATE_UNSPECIFIED.value
    
    114
    -            else:
    
    115
    -                job = create_job
    
    116
    -                job.lease = job.create_lease()
    
    117
    -
    
    118
    -        elif state == LeaseState.CANCELLED.value:
    
    119
    -            job.lease = lease
    
    120
    -
    
    121
    -        else:
    
    122
    -            raise Exception("Unknown state: {}".format(state))
    
    123
    -
    
    79
    +        job.lease.state = state
    
    124 80
             self.jobs[name] = job
    
    125
    -        return job.lease
    
    81
    +
    
    82
    +    def get_job_lease(self, name):
    
    83
    +        return self.jobs[name].lease
    
    126 84
     
    
    127 85
         def cancel_session(self, name):
    
    128 86
             job = self.jobs[name]
    
    ... ... @@ -130,3 +88,12 @@ class Scheduler():
    130 88
             if state == LeaseState.PENDING.value or \
    
    131 89
                state == LeaseState.ACTIVE.value:
    
    132 90
                 self.retry_job(name)
    
    91
    +
    
    92
    +    def create_leases(self):
    
    93
    +        while len(self.queue) > 0:
    
    94
    +            job = self.queue.popleft()
    
    95
    +            job.update_execute_stage(ExecuteStage.EXECUTING)
    
    96
    +            job.lease = job.create_lease()
    
    97
    +            job.lease.state = LeaseState.PENDING.value
    
    98
    +            self.jobs[job.name] = job
    
    99
    +            yield job.lease

  • buildgrid/server/worker/bots_interface.py
    ... ... @@ -35,6 +35,7 @@ class BotsInterface():
    35 35
             self.logger = logging.getLogger(__name__)
    
    36 36
     
    
    37 37
             self._bot_ids = {}
    
    38
    +        self._bot_sessions = {}
    
    38 39
             self._scheduler = scheduler
    
    39 40
     
    
    40 41
         def create_bot_session(self, parent, bot_session):
    
    ... ... @@ -59,6 +60,7 @@ class BotsInterface():
    59 60
             bot_session.name = name
    
    60 61
     
    
    61 62
             self._bot_ids[name] = bot_id
    
    63
    +        self._bot_sessions[name] = bot_session
    
    62 64
             self.logger.info("Created bot session name={} with bot_id={}".format(name, bot_id))
    
    63 65
             return bot_session
    
    64 66
     
    
    ... ... @@ -69,13 +71,63 @@ class BotsInterface():
    69 71
             self.logger.debug("Updating bot session name={}".format(name))
    
    70 72
             self._check_bot_ids(bot_session.bot_id, name)
    
    71 73
     
    
    72
    -        leases = [self._scheduler.update_lease(lease) for lease in bot_session.leases]
    
    74
    +        server_session = self._bot_sessions[name]
    
    75
    +
    
    76
    +        leases = filter(None, [self.check_states(lease) for lease in bot_session.leases])
    
    73 77
     
    
    74 78
             del bot_session.leases[:]
    
    75 79
             bot_session.leases.extend(leases)
    
    76 80
     
    
    81
    +        for lease in self._scheduler.create_leases():
    
    82
    +            bot_session.leases.extend([lease])
    
    83
    +
    
    84
    +        self._bot_sessions[name] = bot_session
    
    77 85
             return bot_session
    
    78 86
     
    
    87
    +    def check_states(self, client_lease):
    
    88
    +        """ Edge detector for states
    
    89
    +        """
    
    90
    +        ## TODO: Handle cancelled states
    
    91
    +        server_lease = self._scheduler.get_job_lease(client_lease.id)
    
    92
    +        server_state = LeaseState(server_lease.state)
    
    93
    +        client_state = LeaseState(client_lease.state)
    
    94
    +
    
    95
    +        if server_state == LeaseState.PENDING:
    
    96
    +
    
    97
    +            if client_state == LeaseState.ACTIVE:
    
    98
    +                self._scheduler.update_job_lease_state(client_lease.id, client_lease.state)
    
    99
    +            elif client_state == LeaseState.COMPLETED:
    
    100
    +                # TODO: Lease was rejected
    
    101
    +                raise NotImplementedError("'Not Accepted' is unsupported")
    
    102
    +            else:
    
    103
    +                raise OutofSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
    
    104
    +
    
    105
    +        elif server_state == LeaseState.ACTIVE:
    
    106
    +
    
    107
    +            if client_state == LeaseState.ACTIVE:
    
    108
    +                pass
    
    109
    +
    
    110
    +            elif client_state == LeaseState.COMPLETED:
    
    111
    +                self._scheduler.job_complete(client_lease.id, client_lease.result)
    
    112
    +                self._scheduler.update_job_lease_state(client_lease.id, client_lease.state)
    
    113
    +                return None
    
    114
    +
    
    115
    +            else:
    
    116
    +                raise OutofSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
    
    117
    +
    
    118
    +        elif server_state == LeaseState.COMPLETED:
    
    119
    +            raise OutofSyncError("Server lease: {}. Client lease: {}".format(server_lease, client_lease))
    
    120
    +
    
    121
    +        elif server_state == LeaseState.CANCELLED:
    
    122
    +            raise NotImplementedError("Cancelled states not supported yet")
    
    123
    +
    
    124
    +        else:
    
    125
    +            # Sould never get here
    
    126
    +            raise OutofSyncError("State now allowed: {}".format(server_state))
    
    127
    +
    
    128
    +        return client_lease
    
    129
    +
    
    130
    +
    
    79 131
         def _check_bot_ids(self, bot_id, name = None):
    
    80 132
             """ Checks the ID and the name of the bot.
    
    81 133
             """
    
    ... ... @@ -103,7 +155,11 @@ class BotsInterface():
    103 155
                 raise InvalidArgumentError("Bot id does not exist: {}".format(name))
    
    104 156
     
    
    105 157
             self.logger.debug("Attempting to close {} with name: {}".format(bot_id, name))
    
    106
    -        self._scheduler.retry_job(name)
    
    158
    +        for lease in self._bot_sessions[name].leases:
    
    159
    +            if lease.state != LeaseState.COMPLETED.value:
    
    160
    +                # TODO: Be wary here, may need to handle rejected leases in future
    
    161
    +                self._scheduler.retry_job(lease.id)
    
    162
    +
    
    107 163
             self.logger.debug("Closing bot session: {}".format(name))
    
    108 164
             self._bot_ids.pop(name)
    
    109 165
             self.logger.info("Closed bot {} with name: {}".format(bot_id, name))

  • buildgrid/server/worker/bots_service.py
    ... ... @@ -43,7 +43,6 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    43 43
                 self.logger.error(e)
    
    44 44
                 context.set_details(str(e))
    
    45 45
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    46
    -            return bots_pb2.BotSession()
    
    47 46
     
    
    48 47
         def UpdateBotSession(self, request, context):
    
    49 48
             try:
    
    ... ... @@ -53,13 +52,16 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    53 52
                 self.logger.error(e)
    
    54 53
                 context.set_details(str(e))
    
    55 54
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    56
    -            return bots_pb2.BotSession()
    
    57 55
     
    
    58 56
             except OutofSyncError as e:
    
    59 57
                 self.logger.error(e)
    
    60 58
                 context.set_details(str(e))
    
    61 59
                 context.set_code(grpc.StatusCode.DATA_LOSS)
    
    62
    -            return bots_pb2.BotSession()
    
    60
    +
    
    61
    +        except NotImplementedError as e:
    
    62
    +            self.logger.error(e)
    
    63
    +            context.set_details(str(e))
    
    64
    +            context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    63 65
     
    
    64 66
         def PostBotEventTemp(self, request, context):
    
    65 67
             context.set_code(grpc.StatusCode.UNIMPLEMENTED)

  • tests/integration/bot_session.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +import grpc
    
    16
    +import pytest
    
    17
    +import uuid
    
    18
    +
    
    19
    +from unittest import mock
    
    20
    +
    
    21
    +from grpc._server import _Context
    
    22
    +from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, worker_pb2
    
    23
    +from google.protobuf import any_pb2
    
    24
    +
    
    25
    +from buildgrid.bot import bot_session, bot_interface
    
    26
    +
    
    27
    +# GRPC context
    
    28
    +@pytest.fixture
    
    29
    +def channel():
    
    30
    +    yield mock.MagicMock(spec = grpc.insecure_channel)
    
    31
    +
    
    32
    +@pytest.fixture
    
    33
    +@mock.patch.object(bot_interface.bots_pb2_grpc, 'BotsStub', autospec = True)
    
    34
    +def interface(*args):
    
    35
    +    yield bot_interface.BotInterface(None)
    
    36
    +
    
    37
    +# Instance to test
    
    38
    +@pytest.fixture
    
    39
    +def instance(interface):
    
    40
    +    yield bot_session.BotSession('deckard', interface)
    
    41
    +
    
    42
    +@pytest.mark.parametrize("docker_value", ["True", "False"])
    
    43
    +@pytest.mark.parametrize("os_value", ["nexus7", "nexus8"])
    
    44
    +def test_create_device(docker_value, os_value):
    
    45
    +    properties = {'docker' : docker_value, 'os' : os_value}
    
    46
    +    device = bot_session.Device(properties)
    
    47
    +
    
    48
    +    assert uuid.UUID(device.name, version=4)
    
    49
    +    assert properties == device.properties
    
    50
    +
    
    51
    +def test_create_device_key_fail():
    
    52
    +    properties = {'voight' : 'kampff'}
    
    53
    +
    
    54
    +    with pytest.raises(KeyError):
    
    55
    +        device = bot_session.Device(properties)
    
    56
    +
    
    57
    +def test_create_device_value_fail():
    
    58
    +    properties = {'docker' :  True}
    
    59
    +
    
    60
    +    with pytest.raises(ValueError):
    
    61
    +        device = bot_session.Device(properties)
    
    62
    +
    
    63
    +def test_create_worker():
    
    64
    +    properties = {'pool' : 'swim'}
    
    65
    +    configs = {'DockerImage' : 'Windows'}
    
    66
    +    worker = bot_session.Worker(properties, configs)
    
    67
    +
    
    68
    +    assert properties == worker.properties
    
    69
    +    assert configs == worker.configs
    
    70
    +
    
    71
    +    device = bot_session.Device()
    
    72
    +    worker.add_device(device)
    
    73
    +
    
    74
    +    assert worker._devices[0] == device
    
    75
    +
    
    76
    +def test_create_worker_key_fail():
    
    77
    +    properties = {'voight' : 'kampff'}
    
    78
    +    configs = {'voight' : 'kampff'}
    
    79
    +
    
    80
    +    with pytest.raises(KeyError):
    
    81
    +        bot_session.Worker(properties)
    
    82
    +    with pytest.raises(KeyError):
    
    83
    +        bot_session.Worker(configs)
    
    84
    +
    
    85
    +def test_add_worker(instance):
    
    86
    +    worker = bot_session.Worker()
    
    87
    +    instance.add_worker(worker)
    
    88
    +
    
    89
    +    assert instance._worker == worker

  • tests/integration/bots_service.py
    ... ... @@ -36,6 +36,12 @@ from buildgrid.server.worker import bots_interface, bots_service
    36 36
     def context():
    
    37 37
         yield mock.MagicMock(spec = _Context)
    
    38 38
     
    
    39
    +@pytest.fixture
    
    40
    +def action_job():
    
    41
    +    action_digest = remote_execution_pb2.Digest()
    
    42
    +    j = job.Job(action_digest, None)
    
    43
    +    yield j
    
    44
    +
    
    39 45
     @pytest.fixture
    
    40 46
     def bot_session():
    
    41 47
         bot = bots_pb2.BotSession()
    
    ... ... @@ -101,7 +107,6 @@ def test_update_bot_session_zombie(bot_session, context, instance):
    101 107
     
    
    102 108
         response = instance.UpdateBotSession(request, context)
    
    103 109
     
    
    104
    -    assert isinstance(response, bots_pb2.BotSession)
    
    105 110
         context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    106 111
     
    
    107 112
     def test_update_bot_session_bot_id_fail(bot_session, context, instance):
    
    ... ... @@ -113,35 +118,33 @@ def test_update_bot_session_bot_id_fail(bot_session, context, instance):
    113 118
     
    
    114 119
         context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    115 120
     
    
    116
    -@pytest.mark.parametrize("number_of_leases", [1, 3, 500])
    
    117
    -def test_update_leases(number_of_leases, bot_session, context, instance):
    
    118
    -    leases = [bots_pb2.Lease() for x in range(number_of_leases)]
    
    119
    -    bot_session.leases.extend(leases)
    
    121
    +@pytest.mark.parametrize("number_of_jobs", [0, 1, 3, 500])
    
    122
    +def test_number_of_leases(number_of_jobs, bot_session, context, instance):
    
    120 123
         request = bots_pb2.CreateBotSessionRequest(parent='',
    
    121 124
                                                    bot_session=bot_session)
    
    125
    +    # Inject work
    
    126
    +    for n in range(0, number_of_jobs):
    
    127
    +        action_digest = remote_execution_pb2.Digest()
    
    128
    +        instance._instance._scheduler.append_job(job.Job(action_digest))
    
    122 129
         # Simulated the severed binding between client and server
    
    123 130
         bot = copy.deepcopy(instance.CreateBotSession(request, context))
    
    124 131
     
    
    132
    +    # Creation of bot session should not create leases
    
    133
    +    assert len(bot.leases) == 0
    
    134
    +
    
    125 135
         request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
    
    126 136
                                                    bot_session=bot)
    
    127 137
     
    
    128 138
         response = instance.UpdateBotSession(request, context)
    
    129 139
     
    
    130
    -    assert isinstance(response, bots_pb2.BotSession)
    
    131
    -    assert len(response.leases) == len(bot.leases)
    
    132
    -    assert bot == response
    
    140
    +    assert len(response.leases) == number_of_jobs
    
    133 141
     
    
    134 142
     def test_update_leases_with_work(bot_session, context, instance):
    
    135
    -    leases = [bots_pb2.Lease() for x in range(2)]
    
    136
    -    bot_session.leases.extend(leases)
    
    137
    -
    
    138
    -    # Inject some work to be done
    
    139
    -    action = remote_execution_pb2.Action()
    
    140
    -    action.command_digest.hash = 'rick'
    
    141
    -    instance._instance._scheduler.append_job(job.Job(action))
    
    142
    -
    
    143 143
         request = bots_pb2.CreateBotSessionRequest(parent='',
    
    144 144
                                                    bot_session=bot_session)
    
    145
    +    # Inject work
    
    146
    +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
    
    147
    +    instance._instance._scheduler.append_job(job.Job(action_digest))
    
    145 148
         # Simulated the severed binding between client and server
    
    146 149
         bot = copy.deepcopy(instance.CreateBotSession(request, context))
    
    147 150
     
    
    ... ... @@ -149,26 +152,50 @@ def test_update_leases_with_work(bot_session, context, instance):
    149 152
                                                    bot_session=bot)
    
    150 153
     
    
    151 154
         response = instance.UpdateBotSession(request, context)
    
    152
    -    response_action = remote_execution_pb2.Action()
    
    153
    -    _unpack_any(response.leases[0].payload, response_action)
    
    154 155
     
    
    156
    +    response_action = remote_execution_pb2.Digest()
    
    157
    +    response.leases[0].payload.Unpack(response_action)
    
    155 158
         assert isinstance(response, bots_pb2.BotSession)
    
    156 159
         assert response.leases[0].state == LeaseState.PENDING.value
    
    157
    -    assert response.leases[1].state == LeaseState.LEASE_STATE_UNSPECIFIED.value
    
    158 160
         assert uuid.UUID(response.leases[0].id, version=4)
    
    159
    -    assert response_action == action
    
    161
    +    assert response_action == action_digest
    
    160 162
     
    
    161 163
     def test_update_leases_work_complete(bot_session, context, instance):
    
    162
    -    leases = [bots_pb2.Lease() for x in range(2)]
    
    163
    -    bot_session.leases.extend(leases)
    
    164
    +    request = bots_pb2.CreateBotSessionRequest(parent='',
    
    165
    +                                               bot_session=bot_session)
    
    166
    +    # Inject work
    
    167
    +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
    
    168
    +    instance._instance._scheduler.append_job(job.Job(action_digest))
    
    169
    +    # Simulated the severed binding between client and server
    
    170
    +    bot = copy.deepcopy(instance.CreateBotSession(request, context))
    
    171
    +
    
    172
    +    request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
    
    173
    +                                               bot_session=bot)
    
    174
    +
    
    175
    +    response = copy.deepcopy(instance.UpdateBotSession(request, context))
    
    176
    +
    
    177
    +    response.leases[0].state = LeaseState.ACTIVE.value
    
    178
    +
    
    179
    +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
    
    180
    +                                               bot_session=response)
    
    181
    +
    
    182
    +    response = copy.deepcopy(instance.UpdateBotSession(request, context))
    
    183
    +
    
    184
    +    response.leases[0].state = LeaseState.COMPLETED.value
    
    185
    +
    
    186
    +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
    
    187
    +                                               bot_session=response)
    
    164 188
     
    
    165
    -    # Inject some work to be done
    
    166
    -    action = remote_execution_pb2.Action()
    
    167
    -    action.command_digest.hash = 'rick'
    
    168
    -    instance._instance._scheduler.append_job(job.Job(action))
    
    189
    +    response = copy.deepcopy(instance.UpdateBotSession(request, context))
    
    190
    +
    
    191
    +    assert len(response.leases) == 0
    
    169 192
     
    
    193
    +def test_work_rejected_by_bot(bot_session, context, instance):
    
    170 194
         request = bots_pb2.CreateBotSessionRequest(parent='',
    
    171 195
                                                    bot_session=bot_session)
    
    196
    +    # Inject work
    
    197
    +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
    
    198
    +    instance._instance._scheduler.append_job(job.Job(action_digest))
    
    172 199
         # Simulated the severed binding between client and server
    
    173 200
         bot = copy.deepcopy(instance.CreateBotSession(request, context))
    
    174 201
     
    
    ... ... @@ -177,26 +204,125 @@ def test_update_leases_work_complete(bot_session, context, instance):
    177 204
     
    
    178 205
         response = copy.deepcopy(instance.UpdateBotSession(request, context))
    
    179 206
     
    
    180
    -    operation_name = response.leases[0].id
    
    207
    +    response.leases[0].state = LeaseState.COMPLETED.value
    
    208
    +
    
    209
    +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
    
    210
    +                                               bot_session=response)
    
    211
    +
    
    212
    +    response = instance.UpdateBotSession(request, context)
    
    213
    +
    
    214
    +    context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    215
    +
    
    216
    +def test_work_rejected_by_bot(bot_session, context, instance):
    
    217
    +    request = bots_pb2.CreateBotSessionRequest(parent='',
    
    218
    +                                               bot_session=bot_session)
    
    219
    +    # Inject work
    
    220
    +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
    
    221
    +    instance._instance._scheduler.append_job(job.Job(action_digest))
    
    222
    +    # Simulated the severed binding between client and server
    
    223
    +    bot = copy.deepcopy(instance.CreateBotSession(request, context))
    
    224
    +
    
    225
    +    request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
    
    226
    +                                               bot_session=bot)
    
    227
    +
    
    228
    +    response = copy.deepcopy(instance.UpdateBotSession(request, context))
    
    181 229
     
    
    182
    -    assert response.leases[0].state == LeaseState.PENDING.value
    
    183 230
         response.leases[0].state = LeaseState.COMPLETED.value
    
    184 231
     
    
    185 232
         request = bots_pb2.UpdateBotSessionRequest(name=response.name,
    
    186 233
                                                    bot_session=response)
    
    234
    +
    
    235
    +    response = instance.UpdateBotSession(request, context)
    
    236
    +
    
    237
    +    context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    238
    +
    
    239
    +@pytest.mark.parametrize("state", [ LeaseState.LEASE_STATE_UNSPECIFIED, LeaseState.PENDING])
    
    240
    +def test_work_out_of_sync_from_pending(state, bot_session, context, instance):
    
    241
    +    request = bots_pb2.CreateBotSessionRequest(parent='',
    
    242
    +                                               bot_session=bot_session)
    
    243
    +    # Inject work
    
    244
    +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
    
    245
    +    instance._instance._scheduler.append_job(job.Job(action_digest))
    
    246
    +    # Simulated the severed binding between client and server
    
    247
    +    bot = copy.deepcopy(instance.CreateBotSession(request, context))
    
    248
    +
    
    249
    +    request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
    
    250
    +                                               bot_session=bot)
    
    251
    +
    
    252
    +    response = copy.deepcopy(instance.UpdateBotSession(request, context))
    
    253
    +
    
    254
    +    response.leases[0].state = state.value
    
    255
    +
    
    256
    +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
    
    257
    +                                               bot_session=response)
    
    258
    +
    
    259
    +    response = instance.UpdateBotSession(request, context)
    
    260
    +
    
    261
    +    context.set_code.assert_called_once_with(grpc.StatusCode.DATA_LOSS)
    
    262
    +
    
    263
    +@pytest.mark.parametrize("state", [ LeaseState.LEASE_STATE_UNSPECIFIED, LeaseState.PENDING])
    
    264
    +def test_work_out_of_sync_from_active(state, bot_session, context, instance):
    
    265
    +    request = bots_pb2.CreateBotSessionRequest(parent='',
    
    266
    +                                               bot_session=bot_session)
    
    267
    +    # Inject work
    
    268
    +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
    
    269
    +    instance._instance._scheduler.append_job(job.Job(action_digest))
    
    187 270
         # Simulated the severed binding between client and server
    
    271
    +    bot = copy.deepcopy(instance.CreateBotSession(request, context))
    
    272
    +
    
    273
    +    request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
    
    274
    +                                               bot_session=bot)
    
    275
    +
    
    188 276
         response = copy.deepcopy(instance.UpdateBotSession(request, context))
    
    189
    -    assert isinstance(response, bots_pb2.BotSession)
    
    190
    -    assert instance._instance._scheduler.jobs[operation_name]._execute_stage == ExecuteStage.COMPLETED
    
    277
    +
    
    278
    +    response.leases[0].state = LeaseState.ACTIVE.value
    
    279
    +
    
    280
    +    request = copy.deepcopy(bots_pb2.UpdateBotSessionRequest(name=response.name,
    
    281
    +                                                             bot_session=response))
    
    282
    +
    
    283
    +    response = instance.UpdateBotSession(request, context)
    
    284
    +
    
    285
    +    response.leases[0].state = state.value
    
    286
    +
    
    287
    +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
    
    288
    +                                               bot_session=response)
    
    289
    +
    
    290
    +    response = instance.UpdateBotSession(request, context)
    
    291
    +
    
    292
    +    context.set_code.assert_called_once_with(grpc.StatusCode.DATA_LOSS)
    
    293
    +
    
    294
    +def test_work_active_to_active(bot_session, context, instance):
    
    295
    +    request = bots_pb2.CreateBotSessionRequest(parent='',
    
    296
    +                                               bot_session=bot_session)
    
    297
    +    # Inject work
    
    298
    +    action_digest = remote_execution_pb2.Digest(hash = 'gaff')
    
    299
    +    instance._instance._scheduler.append_job(job.Job(action_digest))
    
    300
    +    # Simulated the severed binding between client and server
    
    301
    +    bot = copy.deepcopy(instance.CreateBotSession(request, context))
    
    302
    +
    
    303
    +    request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
    
    304
    +                                               bot_session=bot)
    
    305
    +
    
    306
    +    response = copy.deepcopy(instance.UpdateBotSession(request, context))
    
    307
    +
    
    308
    +    response.leases[0].state = LeaseState.ACTIVE.value
    
    309
    +
    
    310
    +    request = copy.deepcopy(bots_pb2.UpdateBotSessionRequest(name=response.name,
    
    311
    +                                                             bot_session=response))
    
    312
    +
    
    313
    +    response = instance.UpdateBotSession(request, context)
    
    314
    +
    
    315
    +    response.leases[0].state = LeaseState.ACTIVE.value
    
    316
    +
    
    317
    +    request = bots_pb2.UpdateBotSessionRequest(name=response.name,
    
    318
    +                                               bot_session=response)
    
    319
    +
    
    320
    +    response = instance.UpdateBotSession(request, context)
    
    321
    +
    
    322
    +    assert response.leases[0].state == LeaseState.ACTIVE.value
    
    191 323
     
    
    192 324
     def test_post_bot_event_temp(context, instance):
    
    193 325
         request = bots_pb2.PostBotEventTempRequest()
    
    194 326
         instance.PostBotEventTemp(request, context)
    
    195 327
     
    
    196 328
         context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    197
    -
    
    198
    -def _unpack_any(unpack_from, to):
    
    199
    -    any = any_pb2.Any()
    
    200
    -    any.CopyFrom(unpack_from)
    
    201
    -    any.Unpack(to)
    
    202
    -    return to



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]