5. Example

We use four AI4DB algorithm as examples, including the Knob Tuning Task Example, Index Recommendation Task Example, Cardinality Estimation Task Example and Query Optimizer Task Example to demonstrate how to use PilotScope.

You can find the complete code of these examples in the test_example_algorithms directory. Before running these examples, you should quickly load the test dataset (e.g., stats_tiny or imdb_tiny databases) into PostgreSQL by referring to the Dataset section.

5.1. Knob Tuning Task Example

We provide a Bayesian optimization method called SMAC as our example.

In the following example, we create a subclass KnobPeriodicModelUpdateEvent by inheriting from PeriodicModelUpdateEvent and implementing custom_model_update method. This method allows us to update the model periodically.

The llamatune library provides multiple knob algorithms. When the KnobPeriodicModelUpdateEvent is triggered, we update the SMAC algorithm in llamatune and obtain the new knob value, which is saved in the exp_state variable.

Next, we use the db_controller.write_knob_to_file function to write the new knob value to the database configuration file. Finally, we restart the database to apply the new knob value.

class KnobPeriodicModelUpdateEvent(PeriodicModelUpdateEvent):
    def __init__(self, config, per_query_count, llamatune_config_file, execute_on_init=True,
                 optimizer_type="smac"):
        super().__init__(config, per_query_count, execute_on_init=execute_on_init)
        self.optimizer_type = optimizer_type
        self.llamatune_config_file = llamatune_config_file

    def custom_model_update(self, pilot_model: PilotModel, db_controller: BaseDBController,
                            data_manager: DataManager):
        db_controller.recover_config()
        db_controller.restart()

        conf = {
            "conf_filepath": self.llamatune_config_file,
            "seed": int(time.time()),
            "optimizer": self.optimizer_type
        }

        exp_state = llamatune(conf)
        db_controller.write_knob_to_file(dict(exp_state.best_conf))
        db_controller.restart()

To updating the SMAC algorithm in llamatune, it is important to obtain the execution time of some queries. The follow code used in SMAC algorithm show how to utilize PilotDataInteractor to facilitate the data collection process. First, we use the data_interactor.push_knob function to push the current knob value to the database. Then, we use the data_interactor.pull_execution_time function to specify the execution time collection. Finally, we use the data_interactor.execute function to execute the query and obtain the execution time. By evaluating the average time of different knob, we can collect enough data to update the SMAC algorithm.

def evaluate_configuration(self, dbms_info, benchmark_info):
    training_sqls = load_sql()
    current_knob_config = smac_algorithm.get_current_knob_config()

    data_interactor = PilotDataInteractor(self.config)
    data_interactor.push_knob(current_knob_config)
    data_interactor.pull_execution_time()
    execution_times = []

    for i, sql in enumerate(training_sqls[1:]):
        data = self.data_interactor.execute(sql, is_reset=(i == len(sqls) - 1))
        execution_times.append(data.execution_time)

    # return the metric for update the smac_algorithm. We omit the detailed update code because it is dependent on the specific algorithm.
    return execution_times

In the following code, we provide a complete deployment example of knob tuning using PilotScope. Here’s a breakdown of the process:

  1. First, an instance of the PilotScheduler class is created using the SchedulerFactory.create_scheduler method with the necessary configuration config.

  2. Then, the KnobPeriodicModelUpdateEvent event is registered with the scheduler to periodically update the knob value. The execute_on_init parameter is set to True to ensure that the knob value is updated once when executing scheduler.init(). The llamatune_config_file and optimizer_type are used by the SMAC algorithm.

  3. The scheduler is configured to collect the execution time of each query by calling the register_required_data method. These data will be stored in the llamatune_data table.

  4. The scheduler.init() method is called to initialize PilotScope scheduler.

  5. Finally, the SMAC algorithm is evaluated by executing the SQL queries in the test set.

class KnobTest(unittest.TestCase):
    def setUp(self):
        self.config: PostgreSQLConfig = PostgreSQLConfig()
        self.config.enable_deep_control_local(example_pg_bin, example_pgdata)
        self.config.db = "stats_tiny"
        self.algo = "smac"

    def test_knob(self):
        try:
            scheduler: PilotScheduler = SchedulerFactory.create_scheduler(config)
            scheduler.db_controller.backup_config()

            # allow to pretrain model
            periodic_model_update_event = KnobPeriodicModelUpdateEvent(config, 200,
                                                                       execute_on_init=True,
                                                                       llamatune_config_file="../algorithm_examples/KnobTuning/llamatune/configs/llama_config.ini",
                                                                       optimizer_type="smac")
            scheduler.register_events([periodic_model_update_event])
            scheduler.register_required_data("llamatune_data", pull_execution_time=True)
            scheduler.init()

            # evaluating test set
            sqls = load_test_sql(self.config.db)
            for i, sql in enumerate(sqls):
                scheduler.execute(sql)
        finally:
            pilotscope_exit()

Thanks for the implementation of LlamaTune and its author for allowing the code to be inserted into the tools of interested users in its documentation.

5.2. Index Recommendation Task Example

Index recommendation task is similar to knob tuning, where an algorithm interacts with a database to complete the task before the actual execution. The difference is that this task involves selecting an index. To improve efficiency during the index recommendation algorithm, we do not actually build the index but use hypothetical indexes instead. To use hypothetical indexes, it’s essential that the connected database has the HypoPG extension installed. Fortunately, PilotScope have already installed HypoPG extension for you and provide an easy way to use it.

In this example, we have used the Extend algorithm as an example, but you can replace ExtendAlgorithm with other algorithms, such as AnytimeAlgorithm, AutoAdminAlgorithm, DB2AdvisAlgorithm and RelaxationAlgorithm, and replace the parameters accordingly.

Similarly, we make IndexPeriodicModelUpdateEvent inherit from PeriodicModelUpdateEvent and implement custom_model_update to complete the task of selecting indexes using an algorithm. When the event is triggered, we call the algo.calculate_best_indexes(workload) function to obtain the best indexes and use the db_controller.create_index function to create these indexes.

class IndexPeriodicModelUpdateEvent(PeriodicModelUpdateEvent):
    def _load_sql(self):
        sqls: list = load_test_sql(self.config.db)
        random.shuffle(sqls)
        if "imdb" in self.config.db:
            sqls = sqls[0:len(sqls) // 2]
        return sqls

    def custom_model_update(self, pilot_model: PilotModel, db_controller: BaseDBController,
                            data_manager: DataManager):
        db_controller.drop_all_indexes()
        sqls = self._load_sql()
        workload = to_workload(sqls)
        parameters = {
            "benchmark_name": self.config.db, "budget_MB": 250, "max_index_width": 2
        }
        # only for adapt the origin ML code with minimal change. Actually, the user need not provide a DbConnector.
        connector = DbConnector(PilotDataInteractor(self.config, enable_simulate_index=True))
        algo = ExtendAlgorithm(connector, parameters=parameters)
        indexes = algo.calculate_best_indexes(workload)
        for index in indexes:
            columns = [c.name for c in index.columns]
            db_controller.create_index(PilotIndex(columns, index.table().name, index.index_idx()))
            print("create index {}".format(index))

In order to train Extend algorithm, we first need to collect the cost of different indexes. PilotScope can help it quickly. In the following code, we push all indexes to the database by data_interactor.push_index(pilot_indexes) function. Then, we use the data_interactor.pull_estimated_cost() function to require collect the estimated cost of a SQL query. Finally, we use the data_interactor.execute(query) function to collect the cost of the query.

def calculate_cost_indexes(self, workload, indexes):
    # ... some omitted code
    self.current_indexes = set(indexes)
    data_interactor = self.db_connector.data_interactor
    data_interactor.push_index(pilot_indexes)
    data_interactor.pull_estimated_cost()
    total_cost = 0
    for query in workload.queries:
        self.cost_requests += 1
        total_cost += data_interactor.execute(query).estimated_cost
    return 

In the following code, we provide a complete deployment example of index recommendation using PilotScope. Here’s a breakdown of the process:

  1. First, an instance of the PilotScheduler class is created using the SchedulerFactory.create_scheduler method with the necessary configuration config.

  2. Then, the IndexPeriodicModelUpdateEvent event is registered with the scheduler to periodically update the indexes. The execute_on_init parameter is set to True to ensure that the knob value is updated once when executing scheduler.init(). The number 200 indicates that the indexes will be updated every 200 queries.

  3. The scheduler is configured to collect the execution time of each query by calling the register_required_data method. These data will be stored in the self.test_data_table table.

  4. The scheduler.init() method is called to initialize PilotScope scheduler.

  5. Finally, the Extend algorithm is evaluated by executing the SQL queries in the test set.

class IndexTest(unittest.TestCase):
    def setUp(self):
        self.config: PilotConfig = PostgreSQLConfig()
        self.config.db = "stats_tiny"
        self.algo = "extend"
        self.test_data_table = "{}_{}_test_data_table".format(self.algo, self.config.db)

    def test_index(self):
        try:
            # core
            scheduler: PilotScheduler = SchedulerFactory.create_scheduler(self.config)

            # allow to pretrain model
            periodic_model_update_event = IndexPeriodicModelUpdateEvent(self.config, 200, execute_on_init=False)
            scheduler.register_events([periodic_model_update_event])
            scheduler.register_required_data(self.test_data_table, pull_execution_time=True)

            # start
            scheduler.init()
            print("start to test sql")
            sqls = load_test_sql(self.config.db)
            for i, sql in enumerate(sqls):
                print("current is the {}-th sql, and it is {}".format(i, sql))
                scheduler.execute(sql)

        finally:
            pilotscope_exit()

5.3. Cardinality Estimation Task Example

Here, we show how to utilize PilotScope to deploy a cardinality estimation algorithm called MSCN into database. Unlike the knob tuning task and index recommendation task, the cardinality estimation algorithm is triggered by each SQL query.

To achieve this goal, we have created a subclass of CardAnchorHandler called MscnParadigmCardAnchorHandler. Within this subclass, we have implemented the acquire_injected_data function, which returns new cardinalities of each sub-query. When the input SQL query is executed, PilotScope will replace the original cardinalities with these new cardinalities. PilotScope will apply the pipeline into each incoming SQL query.

In the following code, we first use the self.data_interactor.pull_subquery_card() function to collect the sub-queries of the input SQL query. Then, we utilize the self.mscn_model.model.predict function to estimate the cardinality of each sub-query. Finally, we return these new cardinalities.

class MscnCardPushHandler(CardPushHandler):
    def __init__(self, model: PilotModel, config: PilotConfig) -> None:
        super().__init__(config)
        self.mscn_model = model
        self.config = config
        self.data_interactor = PilotDataInteractor(config)

    def acquire_injected_data(self, sql):
        self.data_interactor.pull_subquery_card()
        data: PilotTransData = self.data_interactor.execute(sql)
        subquery_2_card = data.subquery_2_card

        # get new cardinalities for each sub-query
        subquery = subquery_2_card.keys()
        _, preds_unnorm, t_total = self.mscn_model.model.predict(subquery)
        new_subquery_2_card = {sq: str(max(0.0, pred - 1)) for sq, pred in zip(subquery, preds_unnorm)}

        # return new cardinalities for each sub-query
        return new_subquery_2_card

To pretraining MSCN model before executing SQL queries, we create MscnPretrainingModelEvent by inheriting PretrainingModelEvent and implement iterative_data_collection and custom_model_training to complete the custom data collection and model training process.

In the iterative_data_collection function, we first use self.pilot_data_interactor.pull_subquery_card() function to collect the sub-queries of the input SQL query. Then, we utilize the self.pilot_data_interactor.pull_record function to collect the true cardinality of each sub-query. Finally, we return the column_2_value_list, which is a list of dicts. Each dict contains a sub-query and its value.
These data will be used to train MSCN algorithm. PilotScope will store these data into the table named self.data_saving_table.

After collecting all data, the custom_model_training function will be called to train the MSCN model. In this function, we first read all collected data using data_manager.read_all(self.data_saving_table). Then, we train the MSCN model using the model.fit function with these collected data.

class MscnPretrainingModelEvent(PretrainingModelEvent):

    def __init__(self, config: PilotConfig, bind_model: PilotModel, data_saving_table, enable_collection=True,
                 enable_training=True):
        super().__init__(config, bind_model, data_saving_table, enable_collection, enable_training)
        self.sqls = []
        self.pilot_data_interactor = PilotDataInteractor(self.config)

    def iterative_data_collection(self, db_controller: BaseDBController, train_data_manager: DataManager):
        self.sqls = load_training_sql(self.config.db)
        column_2_value_list = []
        for sql in self.sqls:
            self.pilot_data_interactor.pull_subquery_card()
            data: PilotTransData = self.pilot_data_interactor.execute(sql)
            for sub_sql in data.subquery_2_card.keys():
                self.pilot_data_interactor.pull_record()
                data: PilotTransData = self.pilot_data_interactor.execute(sub_sql)
            if (not data.records is None):
                column_2_value = {"query": sub_sql, "card": data.records.values[0][0]}
            column_2_value_list.append(column_2_value)
        return column_2_value_list, True

    def custom_model_training(self, bind_model, db_controller: BaseDBController,
                              data_manager: DataManager):
        data: DataFrame = data_manager.read_all(self.data_saving_table)
        tables, joins, predicates = parse_queries(data["query"].values)
        schema = load_schema(self.pilot_data_interactor.db_controller)
        model = MscnModel()
        # Mscn can only handler card that is larger than 0, so we add 1 to all cards. In prediction we minus it by 1.
        model.fit((tables, joins, predicates), data["card"].values + 1, schema)
        return model

In the following code, we provide a complete deployment example of cardinality estimation using PilotScope. Here’s a breakdown of the process:

  1. First, we instance a mscn_pilot_model using MscnPilotModel class. This class is a subclass of PilotModel and implements methods to save and load the MSCN model.

  2. Then, an instance of the PilotScheduler class is created using the SchedulerFactory.create_scheduler method with the necessary configuration config.

  3. Then, we create a MscnCardPushHandler object and register it with the PilotScheduler, which helps us to replace the original cardinalities with new cardinalities generated by MSCN algorithm for each SQL query.

  4. Then, the MscnPretrainingModelEvent event is registered with the scheduler to pretraining the MSCN model when executing scheduler.init(). The enable_collection and enable_training parameters are set to True to enable data collection and model training. All collected data in the event will be stored in the mscn_pretrain_data_table table.

  5. The scheduler is configured to collect the execution time of each query by calling the register_required_data method. These data will be stored in the mscn_data_table table.

  6. The scheduler.init() method is called to initialize PilotScope deployment.

  7. Finally, the MSCN algorithm is evaluated by executing the SQL queries in the test set.

class MscnTest(unittest.TestCase):
    def setUp(self):
        self.config: PilotConfig = PostgreSQLConfig()
        self.config.db = "stats_tiny"

    def test_mscn(self):
        try:
            model_name = "mscn"
            mscn_pilot_model: PilotModel = MscnPilotModel(model_name)
            mscn_pilot_model.load_model()

            scheduler: PilotScheduler = SchedulerFactory.create_scheduler(self.config)

            # register MSCN algorithm for each SQL query.
            mscn_handler = MscnCardPushHandler(mscn_pilot_model, self.config)
            scheduler.register_custom_handlers([mscn_handler])

            pretraining_event = MscnPretrainingModelEvent(self.config, mscn_pilot_model, "mscn_pretrain_data_table",
                                                          enable_collection=True, enable_training=True,
                                                          training_data_file=None)
            # register required data
            scheduler.register_required_data("mscn_data_table", pull_execution_time=True)

            scheduler.register_events([pretraining_event])
            scheduler.init()

            # evaluating MSCN algorithm using test set.
            sqls = load_test_sql(self.config.db)
            for i, sql in enumerate(sqls):
                scheduler.execute(sql)
        finally:
            pilotscope_exit()

Although we used the MSCN algorithm as an example, any cardinality estimation algorithm can be deployed with minimal modification.

5.4. Query Optimizer Task Example

Like the cardinality estimation task, the query optimizer task is triggered by each SQL query. And the query optimizer task is to select the efficient plan of the user query to execute. Here, we show how to utilize PilotScope to deploy a query optimizer algorithm called Lero into database.

Specially, the core idea of Lero is to generate different candidate plans by scaling the cardinalities, and then select the most efficient plan from the candidate plans using a trained learning-to-rank model. To simplify the description, we will denote the best plan generated and selected by Lero as \(P^*\).

To deploy Lero into database, we have converted the action of injecting \(P^*\) into database to injecting the scaled cardinalities which generate the plan \(P^*\). Because the actions mentioned above are equivalent in terms of effects, but the latter is more convenient to implement.

Therefore, we have created a subclass of CardPushHandler called LeroCardPushHandler. Within this subclass, we have implemented the acquire_injected_data method, which returns the scaled cardinalities which generate the plan \(P^*\) of the input SQL query.

In the acquire_injected_data method, we first use the self.pilot_data_interactor.pull_subquery_card() function to collect the subqueries and their original cardinalities of the input SQL query. Then we instantiate an object of CardPickerModel class, which can scale the cardinalities according to the level of subqueries following by the Lero algorithm. Following initialization, the function enters a core loop, where it repeatedly updates the cardinalities, pushes these scaled cardinalities to the database by the self.pilot_data_interactor.push_card() function, and pulls the plan of the scaled cardinalties by the self.pilot_data_interactor.pull_physical_plan() function. Once the loop finishes, we have collected a list of plans of the input SQL query. Then we utilize the trained LeroModelPairWise model to predict the best plan from the candidate plans and return the scaled cardinalities of the best plan. When the input SQL query is executed, PilotScope will replace the original cardinalities with these new scaled cardinalities. PilotScope will also apply this pipeline into each incoming SQL query.

class LeroCardPushHandler(CardPushHandler):

    def __init__(self, model: PilotModel, config: PilotConfig) -> None:
        super().__init__(config)
        self.model = model
        self.config = config
        self.db_controller = DBControllerFactory.get_db_controller(config)
        self.pilot_data_interactor = PilotDataInteractor(config)

    def predict(self, plans):
        leroModel: LeroModelPairWise = self.model.model
        feature_generator = leroModel._feature_generator
        x, _ = feature_generator.transform(plans)
        scores = leroModel.predict(x)
        best_idx = np.argmin(scores)
        return best_idx

    def acquire_injected_data(self, sql):

        # Pull subquery and its cardinality
        self.pilot_data_interactor.pull_subquery_card()
        data: PilotTransData = self.pilot_data_interactor.execute(sql)
        assert data is not None
        subquery_2_card = data.subquery_2_card

        # Initialize CardsPickerModel and other variables
        cards_picker = CardsPickerModel(subquery_2_card.keys(), subquery_2_card.values())
        scale_subquery_2_card = subquery_2_card
        new_cardss = []
        plans = []
        finish = False

        # Core
        while (not finish):
            new_cardss.append(scale_subquery_2_card)
            self.pilot_data_interactor.push_card(scale_subquery_2_card)
            self.pilot_data_interactor.pull_physical_plan()
            data: PilotTransData = self.pilot_data_interactor.execute(sql)
            if data is None:
                continue
            plan = data.physical_plan
            cards_picker.replace(plan)
            plans.append(plan)
            finish, new_cards = cards_picker.get_cards()
            scale_subquery_2_card = {sq: new_card for sq, new_card in zip(subquery_2_card.keys(), new_cards)}
        best_idx = self.predict(plans)
        selected_card = new_cardss[best_idx]
        print(f"The best plan is {best_idx}/{len(new_cardss)}")

        # Return the selected cardinality
        return selected_card

To pretrain Lero model before executing SQL queries, we create LeroPretrainingModelEvent by inheriting PretrainingModelEvent and implement iterative_data_collection and custom_model_training to complete the custom data collection and model training process.

In the iterative_data_collection function, for each SQL query in training queries, we should collect the execution times of different plans generated by different scaled queries for model training. Therefore, in the following code, for each SQL query in training queries, we first use the self.pilot_data_interactor.pull_subquery_card() function to collect the subqueries and their original cardinalities. Then we instantiate an object of CardPickerModel class, which can scale the cardinalities according to the level of subqueries following by the Lero algorithm. Following initialization, the code enters a core loop, where it repeatedly updates the cardinalities, pushes these scaled cardinalities to the database by the self.pilot_data_interactor.push_card() function, pulls the plan and its execution time by the self.pilot_data_interactor.pull_physical_plan() function and the self.pilot_data_interactor.pull_execution_time() function, and collects the information of the SQL query, the physical plan and its execution time into a dictionanry. Once the loop finishes, we have collected a list of dictionanry, which contains the collected training data, called column_2_value_list. The function returns column_2_value_list and PilotScope will store these data into the table named “data_saving_table”.

After collecting all data, the iterative_data_collection function will be called to train the Lero model. In this function, we first read all collected data from database using data_manager.read_all(self.data_saving_table). Then, we extract plan pairs to trian the pair-wise Lero model by the funciton extract_plan_pairs(). And lastly, we train the Lero model using the training_pairwise_pilot_score() function with these collected data.

class LeroPretrainingModelEvent(PretrainingModelEvent):

    def __init__(self, config: PilotConfig, bind_model: PilotModel, data_saving_table, enable_collection=True,
                 enable_training=True):
        super().__init__(config, bind_model, data_saving_table, enable_collection, enable_training)
        self.sqls = []
        self.pilot_data_interactor = PilotDataInteractor(self.config)

    def load_sql(self):
        self.sqls = load_training_sql(self.config.db)[0:100]

    def iterative_data_collection(self, db_controller: BaseDBController, train_data_manager: DataManager):
        print("start to collect data fro pretraining")
        self.load_sql()

        column_2_value_list = []

        for i, sql in enumerate(self.sqls):
            print("current  is {}-th sql, and total sqls is {}".format(i, len(self.sqls)))
            self.pilot_data_interactor.pull_subquery_card()
            data: PilotTransData = self.pilot_data_interactor.execute(sql)
            if data is None:
                continue
            subquery_2_card = data.subquery_2_card
            cards_picker = CardsPickerModel(subquery_2_card.keys(), subquery_2_card.values())
            scale_subquery_2_card = subquery_2_card
            finish = False
            while (not finish):
                column_2_value = {}
                self.pilot_data_interactor.push_card(scale_subquery_2_card)
                self.pilot_data_interactor.pull_physical_plan()
                self.pilot_data_interactor.pull_execution_time()
                data: PilotTransData = self.pilot_data_interactor.execute(sql)
                if data is None:
                    continue
                plan = data.physical_plan
                cards_picker.replace(plan)
                column_2_value["sql"] = sql
                column_2_value["plan"] = plan
                column_2_value["time"] = data.execution_time
                finish, new_cards = cards_picker.get_cards()
                scale_subquery_2_card = {sq: new_card for sq, new_card in zip(subquery_2_card.keys(), new_cards)}
                column_2_value_list.append(column_2_value)
        return column_2_value_list, True

    def custom_model_training(self, bind_model, db_controller: BaseDBController,
                              data_manager: DataManager):
        data: DataFrame = data_manager.read_all(self.data_saving_table)
        plans1, plans2 = extract_plan_pairs(data)
        lero_model = training_pairwise_pilot_score(bind_model, plans1, plans2)
        return lero_model

In the following code, we provide a complete deployment example of Lero algorithm using PilotScope. Here’s a breakdown of the process:

  1. First, we instantiate a lero_pilot_model instance of LeroPilotModel class. This class is a subclass of PilotModel and implements methods to save and load the model.

  2. Then, an instance of the PilotScheduler class is created using the SchedulerFactory.create_scheduler() method with the necessary configuration config.

  3. Then, we create a LeroCardPushHandler object, which inherits from CardPushHandler class and helps us to replace the original cardinalities with new cardinalities generated by Lero algorithm for each SQL query. And we register it in the scheduler by scheduler.register_custom_handlers() function.

  4. Then, the LeroPretrainingModelEvent event is to pretrain the Lero model when initializing the PilotScope. The enable_collection and enable_training parameters are set to True to enable data collection and model training. All collected data in the event will be stored in the lero_pretraining_collect_data table. And it is registered in the scheduler with scheduler.register_events() function.

  5. The scheduler is configured to collect the execution time and the physical plan of each query by calling the register_required_data() method. These data will be stored in the lero_pair_test_data_table table.

  6. The scheduler.init() method is called to initialize PilotScope.

  7. And Lastly, we evaluate the Lero algorithm by executing the SQL queries in the test set.

class LeroTest(unittest.TestCase):
    def setUp(self):
        self.config: PilotConfig = PostgreSQLConfig()
        self.config.db = "stats_tiny"

    def test_lero(self):
        try:
            config = self.config

            model_name = "lero_pair"
            test_data_table = "{}_test_data_table".format(model_name)
            pretraining_data_table = "lero_pretraining_collect_data"

            lero_pilot_model: PilotModel = LeroPilotModel(model_name)
            lero_pilot_model.load()
            lero_handler = LeroCardPushHandler(lero_pilot_model, config)

            # core
            scheduler: PilotScheduler = SchedulerFactory.create_scheduler(config)
            scheduler.register_custom_handlers([lero_handler])
            scheduler.register_required_data(test_data_table, pull_execution_time=True, pull_physical_plan=True)

            # allow to pretrain model
            pretraining_event = LeroPretrainingModelEvent(config, lero_pilot_model, pretraining_data_table,
                                                          enable_collection=True,
                                                          enable_training=True)
            scheduler.register_events([pretraining_event])

            # start
            scheduler.init()

            print("start to test sql")
            sqls = load_test_sql(config.db)
            for i, sql in enumerate(sqls):
                print("current is the {}-th sql, and it is {}".format(i, sql))
                scheduler.execute(sql)
        finally:
            pilotscope_exit()

Although we used the Lero algorithm as an example, other learned query optimization algorithm can be deployed with minimal modification.