Source code for circuit_knitting.cutting.cutqc.wire_cutting

# This code is a Qiskit project.

# (C) Copyright IBM 2022.

# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""Functions for conducting the wire cutting on quantum circuits."""

from __future__ import annotations

from typing import Sequence, Any, Dict, cast, no_type_check

import numpy as np
from qiskit import QuantumCircuit, QuantumRegister
from qiskit.circuit import Qubit
from qiskit.dagcircuit import DAGCircuit, DAGOpNode
from qiskit.converters import circuit_to_dag, dag_to_circuit
from qiskit_ibm_runtime import Options, QiskitRuntimeService
from qiskit.utils.deprecation import deprecate_func

from .wire_cutting_evaluation import run_subcircuit_instances
from .wire_cutting_post_processing import generate_summation_terms, build
from .wire_cutting_verification import generate_reconstructed_output
from .dynamic_definition import (
    dd_build,
    get_reconstruction_qubit_order,
    read_dd_bins,
)


[docs] @deprecate_func( removal_timeline="no sooner than CKT v0.8.0", since="0.7.0", package_name="circuit-knitting-toolbox", additional_msg="Use the wire cutting or automated cut-finding functionality in the ``circuit_knitting.cutting`` package. ", ) def cut_circuit_wires( circuit: QuantumCircuit, method: str, subcircuit_vertices: Sequence[Sequence[int]] | None = None, max_subcircuit_width: int | None = None, max_subcircuit_cuts: int | None = None, max_subcircuit_size: int | None = None, max_cuts: int | None = None, num_subcircuits: Sequence[int] | None = None, verbose: bool = True, ) -> dict[str, Any]: """ Decompose the circuit into a collection of subcircuits. Args: method: Whether to have the cuts be 'automatically' found, in a provably optimal way, or whether to 'manually' specify the cuts subcircuit_vertices: The vertices to be used in the subcircuits. Note that these are not the indices of the qubits, but the nodes in the circuit DAG max_subcircuit_width: Max number of qubits in each subcircuit max_cuts: Max total number of cuts allowed num_subcircuits: List of number of subcircuits to try max_subcircuit_cuts: Max number of cuts for a subcircuit max_subcircuit_size: Max number of gates in a subcircuit verbose: Flag for printing output of cutting Returns: A dictionary containing information on the cuts, including the subcircuits themselves (key: 'subcircuits') Raises: ValueError: The input method does not match the other provided arguments """ cuts = {} if method == "automatic": if max_subcircuit_width is None: raise ValueError( "The max_subcircuit_width argument must be set if using automatic cut-finding." ) cuts = find_wire_cuts( circuit=circuit, max_subcircuit_width=max_subcircuit_width, max_cuts=max_cuts, num_subcircuits=num_subcircuits, max_subcircuit_cuts=max_subcircuit_cuts, max_subcircuit_size=max_subcircuit_size, verbose=verbose, ) elif method == "manual": if subcircuit_vertices is None: raise ValueError( "The subcircuit_vertices argument must be set if manually specifying cuts." ) cuts = cut_circuit_wire( circuit=circuit, subcircuit_vertices=subcircuit_vertices, verbose=verbose ) else: ValueError( 'The method argument for the decompose method should be either "automatic" or "manual".' ) return cuts
[docs] @deprecate_func( removal_timeline="no sooner than CKT v0.8.0", since="0.7.0", package_name="circuit-knitting-toolbox", additional_msg="Use the wire cutting or automated cut-finding functionality in the ``circuit_knitting.cutting`` package. ", ) def evaluate_subcircuits( cuts: dict[str, Any], service: QiskitRuntimeService | None = None, backend_names: str | Sequence[str] | None = None, options: Options | Sequence[Options] | None = None, ) -> dict[int, dict[int, np.ndarray]]: """ Evaluate the subcircuits. Args: cuts: The results of cutting service: A service for connecting to Qiskit Runtime Service options: Options to use on each backend backend_names: The name(s) of the backend(s) to be used Returns: The dictionary containing the results from running each of the subcircuits """ # Put backend_names and options in lists to ensure it is unambiguous how to sync them backends_list: Sequence[str] = [] options_list: Sequence[Options] = [] if backend_names is None or isinstance(backend_names, str): if isinstance(options, Options): options_list = [options] elif isinstance(options, Sequence) and (len(options) != 1): options_list = [options[0]] if isinstance(backend_names, str): backends_list = [backend_names] else: backends_list = backend_names if isinstance(options, Options): options_list = [options] * len(backends_list) elif options is None: options_list = [None] * len(backends_list) else: options_list = options if backend_names: if len(backends_list) != len(options_list): raise AttributeError( f"The list of backend names is length ({len(backends_list)}), " f"but the list of options is length ({len(options_list)}). " "It is ambiguous how these options should be applied." ) _, _, subcircuit_instances = _generate_metadata(cuts) subcircuit_instance_probabilities = _run_subcircuits( cuts, subcircuit_instances, service=service, backend_names=backends_list, options=options_list, ) return subcircuit_instance_probabilities
[docs] @deprecate_func( removal_timeline="no sooner than CKT v0.8.0", since="0.7.0", package_name="circuit-knitting-toolbox", additional_msg="Use the wire cutting or automated cut-finding functionality in the ``circuit_knitting.cutting`` package. ", ) def reconstruct_full_distribution( circuit: QuantumCircuit, subcircuit_instance_probabilities: dict[int, dict[int, np.ndarray]], cuts: dict[str, Any], num_threads: int = 1, ) -> np.ndarray: """ Reconstruct the full probabilities from the subcircuit evaluations. Args: circuit: The original full circuit subcircuit_instance_probabilities: The probability vectors from each of the subcircuit instances, as output by the _run_subcircuits function num_threads: The number of threads to use to parallelize the recomposing Returns: The reconstructed probability vector """ summation_terms, subcircuit_entries, _ = _generate_metadata(cuts) subcircuit_entry_probabilities = _attribute_shots( subcircuit_entries, subcircuit_instance_probabilities ) unordered_probability, smart_order, overhead = build( summation_terms=summation_terms, subcircuit_entry_probs=subcircuit_entry_probabilities, num_cuts=cuts["num_cuts"], num_threads=num_threads, ) reconstructed_probability = generate_reconstructed_output( circuit, cuts["subcircuits"], unordered_probability, smart_order, cuts["complete_path_map"], ) return reconstructed_probability
@deprecate_func( removal_timeline="no sooner than CKT v0.8.0", since="0.7.0", package_name="circuit-knitting-toolbox", additional_msg="Use the wire cutting or automated cut-finding functionality in the ``circuit_knitting.cutting`` package. ", ) def create_dd_bin( subcircuit_instance_probabilities: dict[int, dict[int, np.ndarray]], cuts: dict[str, Any], mem_limit: int, recursion_depth: int, num_threads: int = 1, ) -> dict[int, Any]: """ Create a bin for Dynamic Definition. Args: subcircuit_instance_probabilities: The probability vectors from each of the subcircuit instances, as output by the _run_subcircuits function cuts: The results of cutting mem_limit: maximum system memory recursion_depth: the number of recursive call for Dynamic Definition num_threads: The number of threads to use to parallelize the recomposing Returns: The bin for dynamic definition """ summation_terms, subcircuit_entries, subcircuit_instances = _generate_metadata(cuts) subcircuit_entry_probabilities = _attribute_shots( subcircuit_entries, subcircuit_instance_probabilities ) return dd_build( summation_terms=summation_terms, subcircuit_entry_probs=subcircuit_entry_probabilities, num_cuts=cuts["num_cuts"], mem_limit=mem_limit, recursion_depth=recursion_depth, counter=cuts["counter"], subcircuit_instances=subcircuit_instances, num_threads=num_threads, ) @deprecate_func( removal_timeline="no sooner than CKT v0.8.0", since="0.7.0", package_name="circuit-knitting-toolbox", additional_msg="Use the wire cutting or automated cut-finding functionality in the ``circuit_knitting.cutting`` package. ", ) def reconstruct_dd_full_distribution( circuit: QuantumCircuit, cuts: dict[str, Any], dd_bins: dict[int, Any], ) -> np.ndarray: """ Reconstruct the full probabilities from bins of dynamic definition. Args: circuit: The original full circuit cuts: The results of cutting dd_bins: The bin for Dynamic Definition Returns: The reconstructed probability vector """ subcircuit_out_qubits = get_reconstruction_qubit_order( full_circuit=circuit, complete_path_map=cuts["complete_path_map"], subcircuits=cuts["subcircuits"], ) reconstructed_prob = read_dd_bins( subcircuit_out_qubits=subcircuit_out_qubits, dd_bins=dd_bins ) return reconstructed_prob def _generate_metadata(cuts: dict[str, Any]) -> tuple[ list[dict[int, int]], dict[int, dict[tuple[str, str], tuple[int, Sequence[tuple[int, int]]]]], dict[int, dict[tuple[tuple[str, ...], tuple[Any, ...]], int]], ]: """ Generate metadata used to execute subcircuits and reconstruct probabilities of original circuit. Args: cuts: Results from the cutting step Returns: Information about the 4^(num cuts) summation terms used to reconstruct original probabilities, a dictionary with information on each of the subcircuits, and a dictionary containing indexes for each of the subcircuits """ ( summation_terms, subcircuit_entries, subcircuit_instances, ) = generate_summation_terms( subcircuits=cuts["subcircuits"], complete_path_map=cuts["complete_path_map"], num_cuts=cuts["num_cuts"], ) return summation_terms, subcircuit_entries, subcircuit_instances def _run_subcircuits( cuts: dict[str, Any], subcircuit_instances: dict[int, dict[tuple[tuple[str, ...], tuple[Any, ...]], int]], service: QiskitRuntimeService | None = None, backend_names: Sequence[str] | None = None, options: Sequence[Options] | None = None, ) -> dict[int, dict[int, np.ndarray]]: """ Execute all the subcircuit instances. task['subcircuit_instance_probs'][subcircuit_idx][subcircuit_instance_idx] = measured prob Args: cuts: Results from the cutting step subcircuit_instances: The dictionary containing the index information for each of the subcircuit instances service: The arguments for the runtime service backend_names: The backend(s) used to run the subcircuits options: Options for the runtime execution of subcircuits Returns: The resulting probabilities from each of the subcircuit instances """ subcircuit_instance_probs = run_subcircuit_instances( subcircuits=cuts["subcircuits"], subcircuit_instances=subcircuit_instances, service=service, backend_names=backend_names, options=options, ) return subcircuit_instance_probs def _attribute_shots( subcircuit_entries: dict[ int, dict[tuple[str, str], tuple[int, Sequence[tuple[int, int]]]] ], subcircuit_instance_probs: dict[int, dict[int, np.ndarray]], ) -> dict[int, dict[int, np.ndarray]]: """ Attribute the shots into respective subcircuit entries. task['subcircuit_entry_probs'][subcircuit_idx][subcircuit_entry_idx] = prob Args: subcircuit_entries: Dictionary containing information about each of the subcircuit instances subcircuit_instance_probs: The probability vectors from each of the subcircuit instances, as output by the _run_subcircuits function Returns: A dictionary containing the probability results to each of the appropriate subcircuits Raises: ValueError: A kronecker term is not size two ValueError: There are no subcircuit probs provided """ subcircuit_entry_probs: dict[int, dict[int, np.ndarray]] = {} for subcircuit_idx in subcircuit_entries: subcircuit_entry_probs[subcircuit_idx] = {} for label in subcircuit_entries[subcircuit_idx]: subcircuit_entry_idx, kronecker_term = subcircuit_entries[subcircuit_idx][ label ] subcircuit_entry_prob: np.ndarray | None = None for coefficient, subcircuit_instance_idx in kronecker_term: if subcircuit_entry_prob is None: subcircuit_entry_prob = ( coefficient * subcircuit_instance_probs[subcircuit_idx][ subcircuit_instance_idx ] ) else: subcircuit_entry_prob += ( coefficient * subcircuit_instance_probs[subcircuit_idx][ subcircuit_instance_idx ] ) if subcircuit_entry_prob is None: raise ValueError( "Something unexpected happened during shot attribution." ) subcircuit_entry_probs[subcircuit_idx][ subcircuit_entry_idx ] = subcircuit_entry_prob return subcircuit_entry_probs @no_type_check @deprecate_func( removal_timeline="no sooner than CKT v0.8.0", since="0.7.0", package_name="circuit-knitting-toolbox", additional_msg="Use the wire cutting or automated cut-finding functionality in the ``circuit_knitting.cutting`` package. ", ) def find_wire_cuts( circuit: QuantumCircuit, max_subcircuit_width: int, max_cuts: int | None, num_subcircuits: Sequence[int] | None, max_subcircuit_cuts: int | None, max_subcircuit_size: int | None, verbose: bool, ) -> dict[str, Any]: """ Find optimal cuts for the wires. Will print if the model cannot find a solution at all, and will print whether the found solution is optimal or not. Args: circuit: Original quantum circuit to be cut into subcircuits max_subcircuit_width: Max number of qubits in each subcircuit max_cuts: Max total number of cuts allowed num_subcircuits: List of number of subcircuits to try max_subcircuit_cuts: Max number of cuts for a subcircuit max_subcircuit_size: The maximum number of two qubit gates in each subcircuit verbose: Whether to print information about the cut-finding or not Returns: The solution found for the cuts """ stripped_circ = _circuit_stripping(circuit=circuit) n_vertices, edges, vertex_ids, id_vertices = _read_circuit(circuit=stripped_circ) num_qubits = circuit.num_qubits cut_solution = {} min_cost = float("inf") best_mip_model = None for num_subcircuit in num_subcircuits: if ( num_subcircuit * max_subcircuit_width - (num_subcircuit - 1) < num_qubits or num_subcircuit > num_qubits or max_cuts + 1 < num_subcircuit ): if verbose: print("%d subcircuits : IMPOSSIBLE" % (num_subcircuit)) continue kwargs = dict( # pylint: disable=use-dict-literal n_vertices=n_vertices, edges=edges, vertex_ids=vertex_ids, id_vertices=id_vertices, num_subcircuit=num_subcircuit, max_subcircuit_width=max_subcircuit_width, max_subcircuit_cuts=max_subcircuit_cuts, max_subcircuit_size=max_subcircuit_size, num_qubits=num_qubits, max_cuts=max_cuts, ) from .mip_model import MIPModel mip_model = MIPModel(**kwargs) feasible = mip_model.solve(min_postprocessing_cost=min_cost) if not feasible: if verbose: print("%d subcircuits : NO SOLUTIONS" % (num_subcircuit)) continue else: positions = _cuts_parser(mip_model.cut_edges, circuit) subcircuits, complete_path_map = _subcircuits_parser( subcircuit_gates=mip_model.subcircuits, circuit=circuit ) O_rho_pairs = _get_pairs(complete_path_map=complete_path_map) counter = _get_counter(subcircuits=subcircuits, O_rho_pairs=O_rho_pairs) classical_cost = _cost_estimate(counter=counter) cost = classical_cost if cost < min_cost: min_cost = cost best_mip_model = mip_model cut_solution = { "max_subcircuit_width": max_subcircuit_width, "subcircuits": subcircuits, "complete_path_map": complete_path_map, "num_cuts": len(positions), "counter": counter, "classical_cost": classical_cost, } if verbose and len(cut_solution) > 0: print("-" * 20) classical_cost: float = float(cut_solution["classical_cost"]) # We can remove typing.Dict from this cast statement when py38 is deprecated. # https://bugs.python.org/issue45117 counter = cast(Dict[int, Dict[str, int]], cut_solution["counter"]) subcircuits: Sequence[Any] = cut_solution["subcircuits"] num_cuts: int = cut_solution["num_cuts"] _print_cutter_result( num_cuts=num_cuts, subcircuits=subcircuits, counter=counter, classical_cost=classical_cost, ) if best_mip_model is None: raise ValueError( "Something went wrong during cut-finding. The best MIP model object was never instantiated." ) print("Model objective value = %.2e" % (best_mip_model.objective), flush=True) # type: ignore print("MIP runtime:", best_mip_model.runtime, flush=True) if best_mip_model.optimal: print("OPTIMAL, MIP gap =", best_mip_model.mip_gap, flush=True) else: print("NOT OPTIMAL, MIP gap =", best_mip_model.mip_gap, flush=True) print("-" * 20, flush=True) return cut_solution @deprecate_func( removal_timeline="no sooner than CKT v0.8.0", since="0.7.0", package_name="circuit-knitting-toolbox", additional_msg="Use the wire cutting or automated cut-finding functionality in the ``circuit_knitting.cutting`` package. ", ) def cut_circuit_wire( circuit: QuantumCircuit, subcircuit_vertices: Sequence[Sequence[int]], verbose: bool ) -> dict[str, Any]: """ Perform the provided cuts. Used when cut locations are chosen manually. Args: circuit: Original quantum circuit to be cut into subcircuits subcircuit_vertices: The list of vertices to apply the cuts to verbose: Whether to print the details of cutting or not Returns: The solution calculated from the provided cuts """ stripped_circ = _circuit_stripping(circuit=circuit) n_vertices, edges, vertex_ids, id_vertices = _read_circuit(circuit=stripped_circ) subcircuit_list = [] for vertices in subcircuit_vertices: subcircuit = [] for vertex in vertices: subcircuit.append(id_vertices[vertex]) subcircuit_list.append(subcircuit) if sum([len(subcircuit) for subcircuit in subcircuit_list]) != n_vertices: raise ValueError("Not all gates are assigned into subcircuits") subcircuit_object = _subcircuits_parser( subcircuit_gates=subcircuit_list, circuit=circuit ) if len(subcircuit_object) != 2: raise ValueError("subcircuit_object should contain exactly two elements.") subcircuits = subcircuit_object[0] complete_path_map = subcircuit_object[-1] O_rho_pairs = _get_pairs(complete_path_map=complete_path_map) counter = _get_counter(subcircuits=subcircuits, O_rho_pairs=O_rho_pairs) classical_cost = _cost_estimate(counter=counter) max_subcircuit_width = max([subcirc.width() for subcirc in subcircuits]) # type: ignore cut_solution = { "max_subcircuit_width": max_subcircuit_width, "subcircuits": subcircuits, "complete_path_map": complete_path_map, "num_cuts": len(O_rho_pairs), "counter": counter, "classical_cost": classical_cost, } if verbose: print("-" * 20) _print_cutter_result( num_cuts=cut_solution["num_cuts"], subcircuits=cut_solution["subcircuits"], counter=cut_solution["counter"], classical_cost=cut_solution["classical_cost"], ) print("-" * 20) return cut_solution def _print_cutter_result( num_cuts: int, subcircuits: Sequence[QuantumCircuit], counter: dict[int, dict[str, int]], classical_cost: float, ) -> None: """ Pretty print the results. Args: num_cuts: The number of cuts subcircuits: The list of subcircuits counter: The dictionary containing all meta information regarding each of the subcircuits classical_cost: The estimated processing cost Returns: None """ print(f"num_cuts = {num_cuts}") for subcircuit_idx, subcircuit in enumerate(subcircuits): print("subcircuit %d" % subcircuit_idx) print( "\u03C1 qubits = %d, O qubits = %d, width = %d, effective = %d, depth = %d, size = %d" % ( counter[subcircuit_idx]["rho"], counter[subcircuit_idx]["O"], counter[subcircuit_idx]["d"], counter[subcircuit_idx]["effective"], counter[subcircuit_idx]["depth"], counter[subcircuit_idx]["size"], ) ) print(subcircuit) print("Estimated cost = %.3e" % classical_cost, flush=True) def _cuts_parser( cuts: Sequence[tuple[str]], circ: QuantumCircuit ) -> list[tuple[Qubit, int]]: """ Convert cuts to wires. Args: cuts: The cuts found by the model (or provided by the user) circ: The quantum circuit the cuts are from Returns: The list containing the wires that were cut and the gates that are affected by these cuts """ dag = circuit_to_dag(circ) positions: list[tuple[Qubit, int]] = [] for position in cuts: if len(position) != 2: raise ValueError( "position variable should be a length 2 sequence: {position}" ) source = position[0] dest = position[-1] source_qargs = [ (x.split("]")[0] + "]", int(x.split("]")[1])) for x in source.split(" ") ] dest_qargs = [ (x.split("]")[0] + "]", int(x.split("]")[1])) for x in dest.split(" ") ] qubit_cut = [] for source_qarg in source_qargs: source_qubit, source_multi_Q_gate_idx = source_qarg for dest_qarg in dest_qargs: dest_qubit, dest_multi_Q_gate_idx = dest_qarg if ( source_qubit == dest_qubit and dest_multi_Q_gate_idx == source_multi_Q_gate_idx + 1 ): qubit_cut.append(source_qubit) # if len(qubit_cut)>1: # raise Exception('one cut is cutting on multiple qubits') for x in source.split(" "): if x.split("]")[0] + "]" == qubit_cut[0]: source_idx = int(x.split("]")[1]) for x in dest.split(" "): if x.split("]")[0] + "]" == qubit_cut[0]: dest_idx = int(x.split("]")[1]) multi_Q_gate_idx = max(source_idx, dest_idx) wire = None for qubit in circ.qubits: if circ.find_bit(qubit).registers[0][0].name == qubit_cut[0].split("[")[ 0 ] and circ.find_bit(qubit).index == int( qubit_cut[0].split("[")[1].split("]")[0] ): wire = qubit tmp = 0 all_Q_gate_idx = None for gate_idx, gate in enumerate( list(dag.nodes_on_wire(wire=wire, only_ops=True)) ): if len(gate.qargs) > 1: tmp += 1 if tmp == multi_Q_gate_idx: all_Q_gate_idx = gate_idx if (wire is None) or (all_Q_gate_idx is None): raise ValueError("Something unexpected happened while parsing cuts.") positions.append((wire, all_Q_gate_idx)) positions = sorted(positions, reverse=True, key=lambda cut: cut[1]) return positions def _subcircuits_parser( subcircuit_gates: list[list[str]], circuit: QuantumCircuit ) -> tuple[Sequence[QuantumCircuit], dict[Qubit, list[dict[str, int | Qubit]]]]: """ Convert the subcircuit gates into quantum circuits and path out the DAGs to enable conversion. Args: subcircuit_gates: The gates in the subcircuits circuit: The original circuit Returns: A tuple containing he subcircuits and the paths in the quantum circuit DAGs """ """ Assign the single qubit gates to the closest two-qubit gates """ def calculate_distance_between_gate(gate_A, gate_B): if len(gate_A.split(" ")) >= len(gate_B.split(" ")): tmp_gate = gate_A gate_A = gate_B gate_B = tmp_gate distance = float("inf") for qarg_A in gate_A.split(" "): qubit_A = qarg_A.split("]")[0] + "]" qgate_A = int(qarg_A.split("]")[-1]) for qarg_B in gate_B.split(" "): qubit_B = qarg_B.split("]")[0] + "]" qgate_B = int(qarg_B.split("]")[-1]) # print('%s gate %d --> %s gate %d'%(qubit_A,qgate_A,qubit_B,qgate_B)) if qubit_A == qubit_B: distance = min(distance, abs(qgate_B - qgate_A)) # print('Distance from %s to %s = %f'%(gate_A,gate_B,distance)) return distance dag = circuit_to_dag(circuit) qubit_allGate_depths = {x: 0 for x in circuit.qubits} qubit_2qGate_depths = {x: 0 for x in circuit.qubits} gate_depth_encodings = {} # print('Before translation :',subcircuit_gates,flush=True) for op_node in dag.topological_op_nodes(): gate_depth_encoding = "" for qarg in op_node.qargs: gate_depth_encoding += "%s[%d]%d " % ( circuit.find_bit(qarg).registers[0][0].name, circuit.find_bit(qarg).index, qubit_allGate_depths[qarg], ) gate_depth_encoding = gate_depth_encoding[:-1] gate_depth_encodings[op_node] = gate_depth_encoding for qarg in op_node.qargs: qubit_allGate_depths[qarg] += 1 if len(op_node.qargs) == 2: MIP_gate_depth_encoding = "" for qarg in op_node.qargs: MIP_gate_depth_encoding += "%s[%d]%d " % ( circuit.find_bit(qarg).registers[0][0].name, circuit.find_bit(qarg).index, qubit_2qGate_depths[qarg], ) qubit_2qGate_depths[qarg] += 1 MIP_gate_depth_encoding = MIP_gate_depth_encoding[:-1] # print('gate_depth_encoding = %s, MIP_gate_depth_encoding = %s'%(gate_depth_encoding,MIP_gate_depth_encoding)) for subcircuit_idx in range(len(subcircuit_gates)): for gate_idx in range(len(subcircuit_gates[subcircuit_idx])): if ( subcircuit_gates[subcircuit_idx][gate_idx] == MIP_gate_depth_encoding ): subcircuit_gates[subcircuit_idx][gate_idx] = gate_depth_encoding break # print('After translation :',subcircuit_gates,flush=True) subcircuit_op_nodes: dict[int, list[DAGOpNode]] = { x: [] for x in range(len(subcircuit_gates)) } subcircuit_sizes = [0 for x in range(len(subcircuit_gates))] complete_path_map: dict[Qubit, list[dict[str, int | Qubit]]] = {} for circuit_qubit in dag.qubits: complete_path_map[circuit_qubit] = [] qubit_ops = dag.nodes_on_wire(wire=circuit_qubit, only_ops=True) for qubit_op_idx, qubit_op in enumerate(qubit_ops): gate_depth_encoding = gate_depth_encodings[qubit_op] nearest_subcircuit_idx = -1 min_distance = float("inf") for subcircuit_idx in range(len(subcircuit_gates)): distance = float("inf") for gate in subcircuit_gates[subcircuit_idx]: if len(gate.split(" ")) == 1: # Do not compare against single qubit gates continue else: distance = min( distance, calculate_distance_between_gate( gate_A=gate_depth_encoding, gate_B=gate ), ) # print('Distance from %s to subcircuit %d = %f'%(gate_depth_encoding,subcircuit_idx,distance)) if distance < min_distance: min_distance = distance nearest_subcircuit_idx = subcircuit_idx assert nearest_subcircuit_idx != -1 path_element = { "subcircuit_idx": nearest_subcircuit_idx, "subcircuit_qubit": subcircuit_sizes[nearest_subcircuit_idx], } if ( len(complete_path_map[circuit_qubit]) == 0 or nearest_subcircuit_idx != complete_path_map[circuit_qubit][-1]["subcircuit_idx"] ): # print('{} op #{:d} {:s} encoding = {:s}'.format(circuit_qubit,qubit_op_idx,qubit_op.name,gate_depth_encoding), # 'belongs in subcircuit %d'%nearest_subcircuit_idx) complete_path_map[circuit_qubit].append(path_element) subcircuit_sizes[nearest_subcircuit_idx] += 1 subcircuit_op_nodes[nearest_subcircuit_idx].append(qubit_op) for circuit_qubit in complete_path_map: # print(circuit_qubit,'-->') for path_element in complete_path_map[circuit_qubit]: path_element_qubit = QuantumRegister( size=subcircuit_sizes[path_element["subcircuit_idx"]], name="q" )[path_element["subcircuit_qubit"]] path_element["subcircuit_qubit"] = path_element_qubit # print(path_element) subcircuits = _generate_subcircuits( subcircuit_op_nodes=subcircuit_op_nodes, complete_path_map=complete_path_map, subcircuit_sizes=subcircuit_sizes, dag=dag, ) return subcircuits, complete_path_map def _generate_subcircuits( subcircuit_op_nodes: dict[int, list[DAGOpNode]], complete_path_map: dict[Qubit, list[dict[str, int | Qubit]]], subcircuit_sizes: Sequence[int], dag: DAGCircuit, ) -> Sequence[QuantumCircuit]: """ Generate the subcircuits from given nodes and paths. Called in the subcircuit_parser function to convert the found paths and nodes into actual quantum circuit objects. Args: subcircuit_op_nodes: The nodes of each of the subcircuits complete_path_map: The complete path through the subcircuits subcircuit_sizes: The number of qubits in each of the subcircuits dag: The dag representation of the input quantum circuit Returns: The subcircuits """ qubit_pointers = {x: 0 for x in complete_path_map} subcircuits = [QuantumCircuit(x, name="q") for x in subcircuit_sizes] for op_node in dag.topological_op_nodes(): subcircuit_idx_list = list( filter( lambda x: op_node in subcircuit_op_nodes[x], subcircuit_op_nodes.keys() ) ) if len(subcircuit_idx_list) != 1: raise ValueError("A node cannot belong to more than one subcircuit.") subcircuit_idx = subcircuit_idx_list[0] # print('{} belongs in subcircuit {:d}'.format(op_node.qargs,subcircuit_idx)) subcircuit_qargs = [] for op_node_qarg in op_node.qargs: if ( complete_path_map[op_node_qarg][qubit_pointers[op_node_qarg]][ "subcircuit_idx" ] != subcircuit_idx ): qubit_pointers[op_node_qarg] += 1 path_element = complete_path_map[op_node_qarg][qubit_pointers[op_node_qarg]] assert path_element["subcircuit_idx"] == subcircuit_idx subcircuit_qargs.append(path_element["subcircuit_qubit"]) # print('-->',subcircuit_qargs) # mypy doesn't recognize QuantumCircuit as being an Iterable, so we ignore subcircuits[subcircuit_idx].append( # type: ignore instruction=op_node.op, qargs=subcircuit_qargs, cargs=None ) return subcircuits def _get_counter( subcircuits: Sequence[QuantumCircuit], O_rho_pairs: list[tuple[dict[str, int | Qubit], dict[str, int | Qubit]]], ) -> dict[int, dict[str, int]]: """ Create information regarding each of the subcircuit parameters (qubits, width, etc.). Args: subcircuits: The list of subcircuits O_rho_pairs: The pairs for each qubit path as generated in the _get_pairs function Returns: The resulting dictionary with all parameter information """ counter = {} for subcircuit_idx, subcircuit in enumerate(subcircuits): counter[subcircuit_idx] = { "effective": subcircuit.num_qubits, "rho": 0, "O": 0, "d": subcircuit.num_qubits, "depth": subcircuit.depth(), "size": subcircuit.size(), } for pair in O_rho_pairs: if len(pair) != 2: raise ValueError(f"O_rho_pairs must be length 2: {pair}") O_qubit = pair[0] rho_qubit = pair[-1] counter[O_qubit["subcircuit_idx"]]["effective"] -= 1 counter[O_qubit["subcircuit_idx"]]["O"] += 1 counter[rho_qubit["subcircuit_idx"]]["rho"] += 1 return counter def _cost_estimate(counter: dict[int, dict[str, int]]) -> float: """ Estimate the cost of processing the subcircuits. Args: counter: Dictionary containing information for each of the subcircuits Returns: The estimated cost for classical processing """ num_cuts = sum([counter[subcircuit_idx]["rho"] for subcircuit_idx in counter]) subcircuit_indices = list(counter.keys()) num_effective_qubits_list = [ counter[subcircuit_idx]["effective"] for subcircuit_idx in subcircuit_indices ] num_effective_qubits, _ = zip( *sorted(zip(num_effective_qubits_list, subcircuit_indices)) ) classical_cost = 0 accumulated_kron_len = 2 ** num_effective_qubits[0] for effective in num_effective_qubits[1:]: accumulated_kron_len *= 2**effective classical_cost += accumulated_kron_len classical_cost *= 4**num_cuts return classical_cost def _get_pairs( complete_path_map: dict[Qubit, list[dict[str, int | Qubit]]] ) -> list[tuple[dict[str, int | Qubit], dict[str, int | Qubit]]]: """ Get all pairs through each path. Iterates through the path for each of the qubits and keeps track of the each pair of neigbors. Args: complete_path_map: The dictionary containing all path information Returns: All pairs for each of the qubit paths """ O_rho_pairs = [] for input_qubit in complete_path_map: path = complete_path_map[input_qubit] if len(path) > 1: for path_ctr, item in enumerate(path[:-1]): O_qubit_tuple = item rho_qubit_tuple = path[path_ctr + 1] O_rho_pairs.append((O_qubit_tuple, rho_qubit_tuple)) return O_rho_pairs def _circuit_stripping(circuit: QuantumCircuit) -> QuantumCircuit: """ Remove all single qubit and barrier type gates. Args: circuit: The circuit to strip Returns: The stripped circuit """ # Remove all single qubit gates and barriers in the circuit dag = circuit_to_dag(circuit) stripped_dag = DAGCircuit() [stripped_dag.add_qreg(x) for x in circuit.qregs] for vertex in dag.topological_op_nodes(): if len(vertex.qargs) == 2 and vertex.op.name != "barrier": stripped_dag.apply_operation_back(op=vertex.op, qargs=vertex.qargs) return dag_to_circuit(stripped_dag) def _read_circuit( circuit: QuantumCircuit, ) -> tuple[int, list[tuple[int, int]], dict[str, int], dict[int, str]]: """ Read the input circuit to a graph based representation for the MIP model. Args: circuit: A stripped circuit to be converted into a DAG like representation Returns: A tuple containing the number of vertices, edge list, vertex to vertex id mapping, and vertex id to vertex mapping. """ dag = circuit_to_dag(circuit) edges = [] node_name_ids = {} id_node_names = {} vertex_ids = {} curr_node_id = 0 qubit_gate_counter = {} for qubit in dag.qubits: qubit_gate_counter[qubit] = 0 for vertex in dag.topological_op_nodes(): if len(vertex.qargs) != 2: raise Exception("vertex does not have 2 qargs!") arg0, arg1 = vertex.qargs vertex_name = "%s[%d]%d %s[%d]%d" % ( circuit.find_bit(arg0).registers[0][0].name, circuit.find_bit(arg0).index, qubit_gate_counter[arg0], circuit.find_bit(arg1).registers[0][0].name, circuit.find_bit(arg1).index, qubit_gate_counter[arg1], ) qubit_gate_counter[arg0] += 1 qubit_gate_counter[arg1] += 1 # print(vertex.op.label,vertex_name,curr_node_id) if vertex_name not in node_name_ids and id(vertex) not in vertex_ids: node_name_ids[vertex_name] = curr_node_id id_node_names[curr_node_id] = vertex_name vertex_ids[id(vertex)] = curr_node_id curr_node_id += 1 for u, v, _ in dag.edges(): if isinstance(u, DAGOpNode) and isinstance(v, DAGOpNode): u_id = vertex_ids[id(u)] v_id = vertex_ids[id(v)] edges.append((u_id, v_id)) n_vertices = dag.size() return n_vertices, edges, node_name_ids, id_node_names