Source code for circuit_knitting.cutting.cutqc.wire_cutting_evaluation

# This code is a Qiskit project.

# (C) Copyright IBM 2022.

# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""Contains functions for executing subcircuits."""

from __future__ import annotations

import itertools
import copy
from typing import Sequence, Any
from multiprocessing.pool import ThreadPool

import numpy as np

from qiskit import QuantumCircuit
from qiskit.utils.deprecation import deprecate_func
from qiskit.converters import circuit_to_dag, dag_to_circuit
from qiskit.circuit.library.standard_gates import HGate, SGate, SdgGate, XGate
from qiskit.primitives import BaseSampler, Sampler as TestSampler
from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager
from qiskit_ibm_runtime import QiskitRuntimeService, Sampler, Session, Options


[docs] @deprecate_func( removal_timeline="no sooner than CKT v0.8.0", since="0.7.0", package_name="circuit-knitting-toolbox", additional_msg="Use the wire cutting or automated cut-finding functionality in the ``circuit_knitting.cutting`` package. ", ) def run_subcircuit_instances( subcircuits: Sequence[QuantumCircuit], subcircuit_instances: dict[int, dict[tuple[tuple[str, ...], tuple[Any, ...]], int]], service: QiskitRuntimeService | None = None, backend_names: Sequence[str] | None = None, options: Sequence[Options] | None = None, ) -> dict[int, dict[int, np.ndarray]]: """ Execute all provided subcircuits. Using the backend(s) provided, this executes all the subcircuits to generate the resultant probability vectors. subcircuit_instance_probs[subcircuit_idx][subcircuit_instance_idx] = measured probability Args: subcircuits: The list of subcircuits to execute subcircuit_instances: Dictionary containing information about each of the subcircuit instances service: The runtime service backend_names: The backend(s) used to execute the subcircuits options: Options for the runtime execution of subcircuits Returns: The probability vectors from each of the subcircuit instances """ if backend_names and options: if len(backend_names) != len(options): raise AttributeError( f"The list of backend names is length ({len(backend_names)}), " f"but the list of options is length ({len(options)}). It is ambiguous " "how these options should be applied." ) if service: if backend_names: backend_names_repeated: list[str | None] = [ backend_names[i % len(backend_names)] for i, _ in enumerate(subcircuits) ] if options is None: options_repeated: list[Options | None] = [None] * len( backend_names_repeated ) else: options_repeated = [ options[i % len(options)] for i, _ in enumerate(subcircuits) ] else: backend_names_repeated = ["ibmq_qasm_simulator"] * len(subcircuits) if options: options_repeated = [options[0]] * len(subcircuits) else: options_repeated = [None] * len(subcircuits) else: backend_names_repeated = [None] * len(subcircuits) options_repeated = [None] * len(subcircuits) subcircuit_instance_probs: dict[int, dict[int, np.ndarray]] = {} with ThreadPool() as pool: args = [ [ subcircuit_instances[subcircuit_idx], subcircuit, service, backend_names_repeated[subcircuit_idx], options_repeated[subcircuit_idx], ] for subcircuit_idx, subcircuit in enumerate(subcircuits) ] subcircuit_instance_probs_list = pool.starmap(_run_subcircuit_batch, args) for i, partition_batch in enumerate(subcircuit_instance_probs_list): subcircuit_instance_probs[i] = partition_batch return subcircuit_instance_probs
@deprecate_func( removal_timeline="no sooner than CKT v0.8.0", since="0.7.0", package_name="circuit-knitting-toolbox", additional_msg="Use the wire cutting or automated cut-finding functionality in the ``circuit_knitting.cutting`` package. ", ) def mutate_measurement_basis(meas: tuple[str, ...]) -> list[tuple[Any, ...]]: """ Change of basis for all identity measurements. For every identity measurement, it is split into an I and Z measurement. I and Z measurement basis correspond to the same logical circuit. Args: meas: The current measurement bases Returns: The update measurement bases """ if all(x != "I" for x in meas): return [meas] else: mutated_meas = [] for x in meas: if x != "I": mutated_meas.append([x]) else: mutated_meas.append(["I", "Z"]) mutated_meas_out = list(itertools.product(*mutated_meas)) return mutated_meas_out @deprecate_func( removal_timeline="no sooner than CKT v0.8.0", since="0.7.0", package_name="circuit-knitting-toolbox", additional_msg="Use the wire cutting or automated cut-finding functionality in the ``circuit_knitting.cutting`` package. ", ) def modify_subcircuit_instance( subcircuit: QuantumCircuit, init: tuple[str, ...], meas: tuple[str, ...] ) -> QuantumCircuit: """ Modify the initialization and measurement bases for a given subcircuit. Args: subcircuit: The subcircuit to be modified init: The current initializations meas: The current measement bases Returns: The updated circuit, modified so the initialziation and measurement operators are all in the standard computational basis Raises: Exeption: One of the inits or meas's are not an acceptable string """ subcircuit_dag = circuit_to_dag(subcircuit) subcircuit_instance_dag = copy.deepcopy(subcircuit_dag) for i, x in enumerate(init): q = subcircuit.qubits[i] if x == "zero": continue elif x == "one": subcircuit_instance_dag.apply_operation_front( op=XGate(), qargs=[q], cargs=[] ) elif x == "plus": subcircuit_instance_dag.apply_operation_front( op=HGate(), qargs=[q], cargs=[] ) elif x == "minus": subcircuit_instance_dag.apply_operation_front( op=HGate(), qargs=[q], cargs=[] ) subcircuit_instance_dag.apply_operation_front( op=XGate(), qargs=[q], cargs=[] ) elif x == "plusI": subcircuit_instance_dag.apply_operation_front( op=SGate(), qargs=[q], cargs=[] ) subcircuit_instance_dag.apply_operation_front( op=HGate(), qargs=[q], cargs=[] ) elif x == "minusI": subcircuit_instance_dag.apply_operation_front( op=SGate(), qargs=[q], cargs=[] ) subcircuit_instance_dag.apply_operation_front( op=HGate(), qargs=[q], cargs=[] ) subcircuit_instance_dag.apply_operation_front( op=XGate(), qargs=[q], cargs=[] ) else: raise Exception("Illegal initialization :", x) for i, x in enumerate(meas): q = subcircuit.qubits[i] if x == "I" or x == "comp": continue elif x == "X": subcircuit_instance_dag.apply_operation_back( op=HGate(), qargs=[q], cargs=[] ) elif x == "Y": subcircuit_instance_dag.apply_operation_back( op=SdgGate(), qargs=[q], cargs=[] ) subcircuit_instance_dag.apply_operation_back( op=HGate(), qargs=[q], cargs=[] ) else: raise Exception("Illegal measurement basis:", x) subcircuit_instance_circuit = dag_to_circuit(subcircuit_instance_dag) return subcircuit_instance_circuit @deprecate_func( removal_timeline="no sooner than CKT v0.8.0", since="0.7.0", package_name="circuit-knitting-toolbox", additional_msg="Use the wire cutting or automated cut-finding functionality in the ``circuit_knitting.cutting`` package. ", ) def run_subcircuits_using_sampler( subcircuits: Sequence[QuantumCircuit], sampler: BaseSampler, ) -> list[np.ndarray]: """ Execute the subcircuit(s). Args: subcircuit: The subcircuits to be executed sampler: The Sampler to use for executions Returns: The probability distributions """ for subcircuit in subcircuits: if subcircuit.num_clbits == 0: subcircuit.measure_all() quasi_dists = sampler.run(circuits=subcircuits).result().quasi_dists all_probabilities_out = [] for i, qd in enumerate(quasi_dists): probabilities = qd.nearest_probability_distribution() probabilities_out = np.zeros(2 ** subcircuits[i].num_qubits, dtype=float) for state in probabilities: probabilities_out[state] = probabilities[state] all_probabilities_out.append(probabilities_out) return all_probabilities_out @deprecate_func( removal_timeline="no sooner than CKT v0.8.0", since="0.7.0", package_name="circuit-knitting-toolbox", additional_msg="Use the wire cutting or automated cut-finding functionality in the ``circuit_knitting.cutting`` package. ", ) def run_subcircuits( subcircuits: Sequence[QuantumCircuit], service: QiskitRuntimeService | None = None, backend_name: str | None = None, options: Options | None = None, ) -> list[np.ndarray]: """ Execute the subcircuit(s). Args: subcircuit: The subcircuits to be executed service: The runtime service backend_name: The backend used to execute the subcircuits options: Options for the runtime execution of subcircuits Returns: The probability distributions """ if service is not None: session = Session(service=service, backend=backend_name) sampler = Sampler(session=session, options=options) # Transpile circuits backend = service.backend(session.backend()) pass_manager = generate_preset_pass_manager( optimization_level=1, backend=backend ) subcircuits = pass_manager.run(subcircuits) else: sampler = TestSampler(options=options) return run_subcircuits_using_sampler(subcircuits, sampler) @deprecate_func( removal_timeline="no sooner than CKT v0.8.0", since="0.7.0", package_name="circuit-knitting-toolbox", additional_msg="Use the wire cutting or automated cut-finding functionality in the ``circuit_knitting.cutting`` package. ", ) def measure_prob(unmeasured_prob: np.ndarray, meas: tuple[Any, ...]) -> np.ndarray: """ Compute the effective probability distribution from the subcircuit distribution. Args: unmeasured_prob: The outputs of the subcircuit execution meas: The measurement bases Returns: The updated measured probability distribution """ if meas.count("comp") == len(meas): return np.array(unmeasured_prob) else: measured_prob = np.zeros(int(2 ** meas.count("comp"))) for full_state, p in enumerate(unmeasured_prob): sigma, effective_state = measure_state(full_state=full_state, meas=meas) # TODO: Add states merging here. Change effective_state to merged_bin measured_prob[effective_state] += sigma * p return measured_prob @deprecate_func( removal_timeline="no sooner than CKT v0.8.0", since="0.7.0", package_name="circuit-knitting-toolbox", additional_msg="Use the wire cutting or automated cut-finding functionality in the ``circuit_knitting.cutting`` package. ", ) def measure_state(full_state: int, meas: tuple[Any, ...]) -> tuple[int, int]: """ Compute the corresponding effective_state for the given full_state. Measured in basis `meas`. Returns sigma (int), effective_state (int) where sigma = +-1 Args: full_state: The current state (in decimal form) meas: The measurement bases Returns: Sigma (defined by the parity of non computational basis 1 measurements) and the effective state (defined by the measurements in the computational basis) """ bin_full_state = bin(full_state)[2:].zfill(len(meas)) sigma = 1 bin_effective_state = "" for meas_bit, meas_basis in zip(bin_full_state, meas[::-1]): if meas_bit == "1" and meas_basis != "I" and meas_basis != "comp": sigma *= -1 if meas_basis == "comp": bin_effective_state += meas_bit effective_state = int(bin_effective_state, 2) if bin_effective_state != "" else 0 return sigma, effective_state def _run_subcircuit_batch( subcircuit_instance: dict[tuple[tuple[str, ...], tuple[Any, ...]], int], subcircuit: QuantumCircuit, service: QiskitRuntimeService | None = None, backend_name: str | None = None, options: Options | None = None, ): """ Execute a circuit using qiskit runtime. Args: subcircuit_instances: Dictionary containing information about each of the subcircuit instances subcircuit: The subcircuit to execute service: The runtime service backend_name: The backends used to execute the subcircuit options: Options for the runtime execution of subcircuit Returns: The measurement probabilities for the subcircuit batch, as calculated from the runtime execution """ subcircuit_instance_probs = {} circuits_to_run = [] # For each circuit associated with a given subcircuit for init_meas in subcircuit_instance: subcircuit_instance_idx = subcircuit_instance[init_meas] # Collect all of the circuits we need to evaluate, ensuring we don't have duplicates if subcircuit_instance_idx not in subcircuit_instance_probs: modified_subcircuit_instance = modify_subcircuit_instance( subcircuit=subcircuit, init=init_meas[0], meas=tuple(init_meas[1]), ) circuits_to_run.append(modified_subcircuit_instance) mutated_meas = mutate_measurement_basis(meas=tuple(init_meas[1])) for meas in mutated_meas: mutated_subcircuit_instance_idx = subcircuit_instance[ (init_meas[0], meas) ] # Set a placeholder in the probability dict to prevent duplicate circuits to the Sampler subcircuit_instance_probs[mutated_subcircuit_instance_idx] = np.array( [0.0] ) # Run all of our circuits in one batch subcircuit_inst_probs = run_subcircuits( circuits_to_run, service=service, backend_name=backend_name, options=options, ) # Calculate the measured probabilities unique_subcircuit_check = {} i = 0 for init_meas in subcircuit_instance: subcircuit_instance_idx = subcircuit_instance[init_meas] if subcircuit_instance_idx not in unique_subcircuit_check: subcircuit_inst_prob = subcircuit_inst_probs[i] i = i + 1 mutated_meas = mutate_measurement_basis(meas=tuple(init_meas[1])) for meas in mutated_meas: measured_prob = measure_prob( unmeasured_prob=subcircuit_inst_prob, meas=meas ) mutated_subcircuit_instance_idx = subcircuit_instance[ (init_meas[0], meas) ] subcircuit_instance_probs[mutated_subcircuit_instance_idx] = ( measured_prob ) unique_subcircuit_check[mutated_subcircuit_instance_idx] = True return subcircuit_instance_probs