class FlashInferCutlassMxfp8LinearKernel(Mxfp8LinearKernel):
"""MXFP8 W8A8 GEMM via FlashInfer CUTLASS (SM100+)."""
@classmethod
def is_supported(
cls, compute_capability: int | None = None
) -> tuple[bool, str | None]:
if current_platform.has_device_capability(100):
return True, None
return False, "requires >=sm_100 (Blackwell)"
@classmethod
def can_implement(cls, c: Mxfp8LinearLayerConfig) -> tuple[bool, str | None]:
return True, None
def process_weights_after_loading(self, layer: torch.nn.Module) -> None:
weight = layer.weight.data # [N, K]
N, K = weight.shape
scale_k = K // MXFP8_BLOCK_SIZE
weight_scale_2d = layer.weight_scale.data[:N, :scale_k].contiguous()
weight_scale_swizzled = swizzle_mxfp8_scale(weight_scale_2d, M=N, K=K)
layer.weight = Parameter(weight.contiguous(), requires_grad=False)
layer.weight_scale = Parameter(
weight_scale_swizzled.contiguous(), requires_grad=False
)
def apply_weights(
self,
layer: torch.nn.Module,
x: torch.Tensor,
bias: torch.Tensor | None = None,
) -> torch.Tensor:
weight = layer.weight
weight_scale = layer.weight_scale
out_dtype = x.dtype
N, K = weight.shape
input_shape = x.shape
input_2d = x.view(-1, K)
M_orig = input_2d.shape[0]
min_dim = 128
assert min_dim <= K, (
f"mm_mxfp8 requires K >= {min_dim}, got K={K}. "
f"in_features is too small for mm_mxfp8."
)
assert K % MXFP8_BLOCK_SIZE == 0, (
f"mm_mxfp8 requires K to be divisible by {MXFP8_BLOCK_SIZE}, got K={K}."
)
assert min_dim <= N, (
f"mm_mxfp8 requires N >= {min_dim}, got N={N}. "
f"out_features is too small for mm_mxfp8."
)
M_padded = ((M_orig + min_dim - 1) // min_dim) * min_dim
if M_padded != M_orig:
pad_rows = M_padded - M_orig
input_2d = torch.nn.functional.pad(input_2d, (0, 0, 0, pad_rows))
input_mxfp8, input_scale = mxfp8_e4m3_quantize(
input_2d, is_sf_swizzled_layout=True
)
if not weight.is_contiguous():
weight = weight.contiguous()
output = vllm_flashinfer.mm_mxfp8(
input_mxfp8,
weight.t(),
input_scale,
weight_scale,
out_dtype=out_dtype,
backend="cutlass",
)
if M_padded != M_orig:
output = output[:M_orig, :]
if bias is not None:
output = output + bias
output_shape = (*input_shape[:-1], N)
return output.view(output_shape)