Token Embed¶
Source https://github.com/vllm-project/vllm/tree/main/examples/pooling/token_embed.
Colqwen3 Token Embed Online¶
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
# ruff: noqa: E501
"""
Example online usage of Pooling API for ColQwen3 multi-vector retrieval.
ColQwen3 is a multi-modal late interaction model based on Qwen3-VL that
produces per-token embeddings (320-dim, L2-normalized) for both text and
image inputs. Similarity is computed via MaxSim scoring.
This example mirrors the official TomoroAI inference code
(https://huggingface.co/TomoroAI/tomoro-colqwen3-embed-4b) but uses the
vLLM serving API instead of local HuggingFace model loading.
Start the server with:
vllm serve TomoroAI/tomoro-colqwen3-embed-4b --max-model-len 4096
Then run this script:
python colqwen3_token_embed_online.py
"""
import argparse
from io import BytesIO
import numpy as np
import pybase64 as base64
import requests
from PIL import Image
# ── Helpers ─────────────────────────────────────────────────
def post_http_request(payload: dict, api_url: str) -> requests.Response:
headers = {"User-Agent": "Test Client"}
return requests.post(api_url, headers=headers, json=payload)
def load_image(url: str) -> Image.Image:
"""Download an image from URL (handles Wikimedia 403)."""
for hdrs in ({}, {"User-Agent": "Mozilla/5.0 (compatible; ColQwen3-demo/1.0)"}):
resp = requests.get(url, headers=hdrs, timeout=10)
if resp.status_code == 403:
continue
resp.raise_for_status()
return Image.open(BytesIO(resp.content)).convert("RGB")
raise RuntimeError(f"Could not fetch image from {url}")
def encode_image_base64(image: Image.Image) -> str:
"""Encode a PIL image to a base64 data URI."""
buf = BytesIO()
image.save(buf, format="PNG")
return "data:image/png;base64," + base64.b64encode(buf.getvalue()).decode()
def compute_maxsim(q_emb: np.ndarray, d_emb: np.ndarray) -> float:
"""Compute ColBERT-style MaxSim score between query and document."""
sim = q_emb @ d_emb.T
return float(sim.max(axis=-1).sum())
# ── Encode functions ────────────────────────────────────────
def encode_queries(texts: list[str], model: str, api_url: str) -> list[np.ndarray]:
"""Encode text queries → list of multi-vector embeddings."""
resp = post_http_request({"model": model, "input": texts}, api_url)
return [np.array(item["data"]) for item in resp.json()["data"]]
def encode_images(image_urls: list[str], model: str, api_url: str) -> list[np.ndarray]:
"""Encode image documents → list of multi-vector embeddings.
Images are sent via the chat-style `messages` field so that the
vLLM multimodal processor handles them correctly.
"""
embeddings = []
for url in image_urls:
print(f" Loading: {url.split('/')[-1]}...")
image = load_image(url)
image_uri = encode_image_base64(image)
resp = post_http_request(
{
"model": model,
"messages": [
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": image_uri}},
{"type": "text", "text": "Describe the image."},
],
}
],
},
api_url,
)
result = resp.json()
if resp.status_code != 200 or "data" not in result:
print(f" Error ({resp.status_code}): {str(result)[:200]}")
continue
embeddings.append(np.array(result["data"][0]["data"]))
return embeddings
# ── Main ────────────────────────────────────────────────────
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--host", type=str, default="localhost")
parser.add_argument("--port", type=int, default=8000)
parser.add_argument(
"--model",
type=str,
default="TomoroAI/tomoro-colqwen3-embed-4b",
)
return parser.parse_args()
def main(args):
pooling_url = f"http://{args.host}:{args.port}/pooling"
score_url = f"http://{args.host}:{args.port}/score"
model = args.model
# Same sample data as the official TomoroAI example
queries = [
"Retrieve the city of Singapore",
"Retrieve the city of Beijing",
"Retrieve the city of London",
]
image_urls = [
"https://upload.wikimedia.org/wikipedia/commons/2/27/Singapore_skyline_2022.jpg",
"https://upload.wikimedia.org/wikipedia/commons/6/61/Beijing_skyline_at_night.JPG",
"https://upload.wikimedia.org/wikipedia/commons/4/49/London_skyline.jpg",
]
# ── 1) Text query embeddings ────────────────────────────
print("=" * 60)
print("1. Encode text queries (multi-vector)")
print("=" * 60)
query_embeddings = encode_queries(queries, model, pooling_url)
for i, emb in enumerate(query_embeddings):
norm = float(np.linalg.norm(emb[0]))
print(f' Query {i}: {emb.shape} (L2 norm: {norm:.4f}) "{queries[i]}"')
# ── 2) Image document embeddings ────────────────────────
print()
print("=" * 60)
print("2. Encode image documents (multi-vector)")
print("=" * 60)
doc_embeddings = encode_images(image_urls, model, pooling_url)
for i, emb in enumerate(doc_embeddings):
print(f" Doc {i}: {emb.shape} {image_urls[i].split('/')[-1]}")
# ── 3) Cross-modal MaxSim scoring ───────────────────────
if doc_embeddings:
print()
print("=" * 60)
print("3. Cross-modal MaxSim scores (text queries × image docs)")
print("=" * 60)
# Header
print(f"{'':>35s}", end="")
for j in range(len(doc_embeddings)):
print(f" Doc {j:>2d}", end="")
print()
# Score matrix
for i, q_emb in enumerate(query_embeddings):
print(f" {queries[i]:<33s}", end="")
for j, d_emb in enumerate(doc_embeddings):
score = compute_maxsim(q_emb, d_emb)
print(f" {score:6.2f}", end="")
print()
# ── 4) Text-only /score endpoint ────────────────────────
print()
print("=" * 60)
print("4. Text-only late interaction scoring (/score endpoint)")
print("=" * 60)
text_query = "What is the capital of France?"
text_docs = [
"The capital of France is Paris.",
"Berlin is the capital of Germany.",
"Python is a programming language.",
]
resp = post_http_request(
{"model": model, "text_1": text_query, "text_2": text_docs},
score_url,
)
print(f' Query: "{text_query}"\n')
for item in resp.json()["data"]:
idx = item["index"]
print(f" Doc {idx} (score={item['score']:.4f}): {text_docs[idx]}")
if __name__ == "__main__":
args = parse_args()
main(args)
Jina Embeddings V4 Offline¶
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import torch
from vllm import LLM
from vllm.inputs import TextPrompt
from vllm.multimodal.utils import fetch_image
# Initialize model
model = LLM(
model="jinaai/jina-embeddings-v4-vllm-text-matching",
runner="pooling",
max_model_len=1024,
gpu_memory_utilization=0.8,
)
# Create text prompts
text1 = "Ein wunderschöner Sonnenuntergang am Strand"
text1_prompt = TextPrompt(prompt=f"Query: {text1}")
text2 = "浜辺に沈む美しい夕日"
text2_prompt = TextPrompt(prompt=f"Query: {text2}")
# Create image prompt
image = fetch_image(
"https://vllm-public-assets.s3.us-west-2.amazonaws.com/multimodal_asset/eskimo.jpg" # noqa: E501
)
image_prompt = TextPrompt(
prompt="<|im_start|>user\n<|vision_start|><|image_pad|><|vision_end|>Describe the image.<|im_end|>\n", # noqa: E501
multi_modal_data={"image": image},
)
# Encode all prompts
prompts = [text1_prompt, text2_prompt, image_prompt]
outputs = model.encode(prompts, pooling_task="token_embed")
def get_embeddings(outputs):
VISION_START_TOKEN_ID, VISION_END_TOKEN_ID = 151652, 151653
embeddings = []
for output in outputs:
if VISION_START_TOKEN_ID in output.prompt_token_ids:
# Gather only vision tokens
img_start_pos = torch.where(
torch.tensor(output.prompt_token_ids) == VISION_START_TOKEN_ID
)[0][0]
img_end_pos = torch.where(
torch.tensor(output.prompt_token_ids) == VISION_END_TOKEN_ID
)[0][0]
embeddings_tensor = output.outputs.data.detach().clone()[
img_start_pos : img_end_pos + 1
]
else:
# Use all tokens for text-only prompts
embeddings_tensor = output.outputs.data.detach().clone()
# Pool and normalize embeddings
pooled_output = (
embeddings_tensor.sum(dim=0, dtype=torch.float32)
/ embeddings_tensor.shape[0]
)
embeddings.append(torch.nn.functional.normalize(pooled_output, dim=-1))
return embeddings
embeddings = get_embeddings(outputs)
for embedding in embeddings:
print(embedding.shape)
Jina Reranker V3 Offline¶
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
# ruff: noqa: E501
import torch.nn.functional as F
from vllm import LLM
query = "What are the health benefits of green tea?"
documents = [
"Green tea contains antioxidants called catechins that may help reduce inflammation and protect cells from damage.",
"El precio del café ha aumentado un 20% este año debido a problemas en la cadena de suministro.",
"Studies show that drinking green tea regularly can improve brain function and boost metabolism.",
"Basketball is one of the most popular sports in the United States.",
"绿茶富含儿茶素等抗氧化剂,可以降低心脏病风险,还有助于控制体重。",
"Le thé vert est riche en antioxydants et peut améliorer la fonction cérébrale.",
]
def main():
# Initialize model
llm = LLM(
model="jinaai/jina-reranker-v3",
runner="pooling",
)
# Generate scores.
outputs = llm.score(query, documents)
# Print the outputs.
print("\nGenerated Outputs:\n" + "-" * 60)
for document, output in zip(documents, outputs):
score = output.outputs.score
print(f"Pair: {[query, document]!r} \nScore: {score}")
print("-" * 60)
# Generate embeddings.
# The JinaForRanking model concatenates docs first, then query.
# Let's stay consistent with this novel design.
outputs = llm.encode(documents + [query], pooling_task="token_embed")
embeds = outputs[0].outputs.data.float()
doc_embeds = embeds[:-1]
query_embeds = embeds[-1]
scores = F.cosine_similarity(query_embeds, doc_embeds)
# Print the outputs.
print("\nGenerated Outputs:\n" + "-" * 60)
for document, score in zip(documents, scores):
print(f"Pair: {[query, document]!r} \nScore: {score}")
print("-" * 60)
if __name__ == "__main__":
main()
Multi Vector Retrieval Offline¶
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
from argparse import Namespace
from vllm import LLM, EngineArgs
from vllm.utils.argparse_utils import FlexibleArgumentParser
def parse_args():
parser = FlexibleArgumentParser()
parser = EngineArgs.add_cli_args(parser)
# Set example specific arguments
parser.set_defaults(
model="BAAI/bge-m3",
runner="pooling",
enforce_eager=True,
)
return parser.parse_args()
def main(args: Namespace):
# Sample prompts.
prompts = [
"Hello, my name is",
"The president of the United States is",
"The capital of France is",
"The future of AI is",
]
# Create an LLM.
# You should pass runner="pooling" for embedding models
llm = LLM(**vars(args))
# Generate embedding. The output is a list of EmbeddingRequestOutputs.
outputs = llm.embed(prompts)
# Print the outputs.
print("\nGenerated Outputs:\n" + "-" * 60)
for prompt, output in zip(prompts, outputs):
embeds = output.outputs.embedding
print(len(embeds))
# Generate embedding for each token. The output is a list of PoolingRequestOutput.
outputs = llm.encode(prompts, pooling_task="token_embed")
# Print the outputs.
print("\nGenerated Outputs:\n" + "-" * 60)
for prompt, output in zip(prompts, outputs):
multi_vector = output.outputs.data
print(multi_vector.shape)
if __name__ == "__main__":
args = parse_args()
main(args)
Multi Vector Retrieval Online¶
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""
Example online usage of Pooling API for multi vector retrieval.
Run `vllm serve <model> --runner pooling`
to start up the server in vLLM. e.g.
vllm serve BAAI/bge-m3
"""
import argparse
import requests
import torch
def post_http_request(prompt: dict, api_url: str) -> requests.Response:
headers = {"User-Agent": "Test Client"}
response = requests.post(api_url, headers=headers, json=prompt)
return response
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--host", type=str, default="localhost")
parser.add_argument("--port", type=int, default=8000)
parser.add_argument("--model", type=str, default="BAAI/bge-m3")
return parser.parse_args()
def main(args):
api_url = f"http://{args.host}:{args.port}/pooling"
model_name = args.model
prompts = [
"Hello, my name is",
"The president of the United States is",
"The capital of France is",
"The future of AI is",
]
prompt = {"model": model_name, "input": prompts}
pooling_response = post_http_request(prompt=prompt, api_url=api_url)
for output in pooling_response.json()["data"]:
multi_vector = torch.tensor(output["data"])
print(multi_vector.shape)
if __name__ == "__main__":
args = parse_args()
main(args)