Python / Golang 调用大模型 API 完整指南
from openai import OpenAI
client = OpenAI(
api_key="your-api-key",
base_url="https://api.openai.com/v1" # 可替换为其他兼容 API
)
# 基础对话
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "你是一个专业助手"},
{"role": "user", "content": "你好,请介绍一下你自己"}
],
temperature=0.7,
max_tokens=1000,
stream=False
)
print(response.choices[0].message.content)
from openai import OpenAI
client = OpenAI(api_key="your-api-key")
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "写一首关于春天的诗"}],
stream=True
)
# 逐块接收响应
for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
print(content, end="", flush=True)
print()
from openai import OpenAI
from pydantic import BaseModel
from typing import Optional
client = OpenAI(api_key="your-api-key")
# 定义函数工具
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "获取指定城市的天气信息",
"parameters": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称,如北京、上海"
}
},
"required": ["city"]
}
}
}
]
# 调用模型
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "北京今天天气怎么样?"}],
tools=tools
)
# 检查是否有函数调用
if response.choices[0].message.tool_calls:
for tool_call in response.choices[0].message.tool_calls:
function_name = tool_call.function.name
arguments = tool_call.function.arguments
print(f"需要调用函数: {function_name}")
print(f"参数: {arguments}")
from openai import OpenAI
from pydantic import BaseModel
from typing import List
client = OpenAI(api_key="your-api-key")
class WeatherInfo(BaseModel):
city: str
temperature: float
humidity: int
condition: str
# 使用 JSON Schema 模式
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "上海今天天气晴朗,25度,湿度60%"}],
response_format={
"type": "json_object",
"json_schema": {
"name": "weather_info",
"schema": {
"type": "object",
"properties": {
"city": {"type": "string"},
"temperature": {"type": "number"},
"humidity": {"type": "integer"},
"condition": {"type": "string"}
},
"required": ["city", "temperature", "humidity", "condition"]
}
}
}
)
import json
result = json.loads(response.choices[0].message.content)
print(result)
from openai import OpenAI
client = OpenAI(api_key="your-api-key")
# 生成文本嵌入向量
response = client.embeddings.create(
model="text-embedding-3-small",
input=[
"人工智能改变世界",
"机器学习是AI的核心",
"深度学习应用广泛"
],
encoding_format="float" # 或 "base64"
)
# 获取嵌入向量
embeddings = response.data[0].embedding
print(f"向量维度: {len(embeddings)}")
print(f"相似度计算示例:")
print(f" 向量1前5个值: {embeddings[:5]}")
# 相似度计算
import numpy as np
vec1 = response.data[0].embedding
vec2 = response.data[1].embedding
cosine_sim = np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
print(f" 向量1与向量2余弦相似度: {cosine_sim:.4f}")
from openai import OpenAI
client = OpenAI(api_key="your-api-key")
# 生成图像
response = client.images.generate(
model="dall-e-3",
prompt="一只可爱的小猫坐在花园里,阳光明媚,真实风格",
size="1024x1024",
quality="standard",
n=1
)
image_url = response.data[0].url
print(f"生成的图像: {image_url}")
# 编辑图像(局部修改)
response = client.images.edit(
model="dall-e-2",
image=open("original.png", "rb"),
mask=open("mask.png", "rb"),
prompt="给小猫戴上帽子",
n=1,
size="1024x1024"
)
# 图像变体
response = client.images.create_variation(
image=open("original.png", "rb"),
n=2,
size="1024x1024"
)
from openai import OpenAI
client = OpenAI(api_key="your-api-key")
# 语音转文字 (Whisper)
audio_file = open("speech.mp3", "rb")
transcript = client.audio.transcriptions.create(
model="whisper-1",
file=audio_file
)
print(transcript.text)
# 文字转语音 (TTS)
response = client.audio.speech.create(
model="tts-1",
voice="alloy", # alloy, echo, fable, onyx, nova, shimmer
input="你好,这是一个语音合成的示例"
)
response.stream_to_file("output.mp3")
# 翻译音频( Whisper 支持多语言转英文)
audio_file = open("chinese_speech.mp3", "rb")
translation = client.audio.translations.create(
model="whisper-1",
file=audio_file
)
print(translation.text)
from openai import OpenAI
client = OpenAI(api_key="your-api-key")
# 列出所有可用模型
models = client.models.list()
print("可用模型列表(前10个):")
for model in models.data[:10]:
print(f" - {model.id} (创建于: {model.created})")
# 获取特定模型信息
model = client.models.retrieve("gpt-4o")
print(f"\n模型详情:")
print(f" ID: {model.id}")
print(f" 拥有者: {model.owned_by}")
print(f" 是否支持生成: {model.capabilities['can_generate']}")
# 检查模型是否支持某功能
print(f" 支持视觉: {model.capabilities.get('vision', False)}")
from openai import OpenAI
client = OpenAI(api_key="your-api-key")
# 创建微调任务
job = client.fine_tuning.jobs.create(
training_file="file-xxx", # 上传的训练文件 ID
model="gpt-3.5-turbo",
hyperparameters={
"n_epochs": 3,
"batch_size": 1,
"learning_rate_multiplier": 0.1
}
)
print(f"微调任务 ID: {job.id}")
# 查询微调状态
job = client.fine_tuning.jobs.retrieve(job.id)
print(f"状态: {job.status}") # queued, running, succeeded, failed
# 列出所有微调任务
jobs = client.fine_tuning.jobs.list(limit=10)
for job in jobs.data:
print(f" - {job.id}: {job.status}")
# 取消微调任务
# client.fine_tuning.jobs.cancel(job.id)
# 删除微调后的模型
# client.models.delete("ft:gpt-3.5-turbo:xxx")
from openai import OpenAI
client = OpenAI(api_key="your-api-key")
# 上传文件(用于微调或知识库)
file = client.files.create(
file=open("training_data.jsonl", "rb"),
purpose="fine-tune"
)
print(f"文件 ID: {file.id}")
# 列出已上传文件
files = client.files.list(purpose="fine-tune")
for f in files.data:
print(f" - {f.id}: {f.filename} ({f.bytes} bytes)")
# 获取文件内容
content = client.files.content("file-xxx")
print(content.text)
# 删除文件
# client.files.delete("file-xxx")
from openai import OpenAI
client = OpenAI(api_key="your-api-key")
# 创建助手
assistant = client.beta.assistants.create(
name="数学老师",
instructions="你是一个数学老师,擅长解释数学概念和解题",
model="gpt-4o",
tools=[{"type": "code_interpreter"}] # 支持代码解释器、文件搜索
)
print(f"助手 ID: {assistant.id}")
# 创建线程
thread = client.beta.threads.create(
messages=[
{
"role": "user",
"content": "请解释一下勾股定理"
}
]
)
# 添加消息并运行
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="已知直角三角形两条直角边为3和4,求斜边长度"
)
run = client.beta.threads.runs.create(
thread_id=thread.id,
assistant_id=assistant.id
)
# 获取运行结果
run = client.beta.threads.runs.retrieve(thread_id=thread.id, run_id=run.id)
while run.status == "completed":
messages = client.beta.threads.messages.list(thread_id=thread.id)
print(messages.data[0].content[0].text.value)
break
from openai import OpenAI
client = OpenAI(api_key="your-api-key")
# 创建批处理任务
batch_job = client.beta.batch.create(
input_file_id="file-xxx", # JSONL 格式文件
endpoint="/v1/chat/completions",
completion_window="24h",
metadata={
"description": "每日数据处理"
}
)
print(f"批处理 ID: {batch_job.id}")
# 查询批处理状态
batch_job = client.beta.batch.retrieve(batch_job.id)
print(f"状态: {batch_job.status}") # validating, in_progress, completed, failed
# 列出所有批处理任务
batches = client.beta.batch.list()
for batch in batches.data:
print(f" - {batch.id}: {batch.status}")
from openai import OpenAI
client = OpenAI(api_key="your-api-key")
# 创建向量存储(知识库)
vector_store = client.beta.vector_stores.create(
name="产品文档",
file_ids=["file-xxx", "file-yyy"] # 已上传的文件
)
print(f"向量存储 ID: {vector_store.id}")
# 添加文件到向量存储
client.beta.vector_stores.files.create(
vector_store_id=vector_store.id,
file_id="file-zzz"
)
# 使用向量存储进行问答
assistant = client.beta.assistants.create(
name="产品助手",
model="gpt-4o",
tool_resources={
"file_search": {
"vector_store_ids": [vector_store.id]
}
}
)
# 向量存储列表
stores = client.beta.vector_stores.list()
for store in stores.data:
print(f" - {store.name}: {store.file_counts}")
package main
import (
"context"
"fmt"
"github.com/openai/openai-go"
)
func main() {
client := openai.NewClient(
openai.WithAPIKey("your-api-key"),
// openai.WithBaseURL("https://api.openai.com/v1"),
)
// 基础对话
resp, err := client.Chat.Completions.New(context.TODO(), openai.ChatCompletionNewParams{
Model: openai.F("gpt-4o"),
Messages: []openai.ChatCompletionMessageParamUnion{
openai.ChatCompletionSystemMessageParam{
Content: openai.ChatCompletionSystemMessageContent{
Text: openai.String("你是一个专业助手"),
},
},
openai.ChatCompletionUserMessageParam{
Content: openai.ChatCompletionUserMessageContent{
Text: openai.String("你好,请介绍一下你自己"),
},
},
},
Temperature: openai.Float(0.7),
MaxTokens: openai.Int(1000),
})
if err != nil {
panic(err)
}
fmt.Println(resp.Choices[0].Message.Content)
}
package main
import (
"context"
"fmt"
"github.com/openai/openai-go"
"github.com/openai/openai-go/option"
)
func main() {
client := openai.NewClient(option.WithAPIKey("your-api-key"))
stream := client.Chat.Completions.NewStreaming(context.TODO(), openai.ChatCompletionNewParams{
Model: openai.F("gpt-4o"),
Messages: []openai.ChatCompletionMessageParamUnion{
openai.ChatCompletionUserMessageParam{
Content: openai.ChatCompletionUserMessageContent{
Text: openai.String("讲一个笑话"),
},
},
},
})
for {
chunk, err := stream.Recv()
if err != nil {
break
}
if content := chunk.Choices[0].Delta.Content; content != "" {
fmt.Print(content)
}
}
fmt.Println()
}
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/openai/openai-go"
)
type WeatherParams struct {
City string `json:"city"`
}
func main() {
client := openai.NewClient(openai.WithAPIKey("your-api-key"))
tools := []openai.ChatCompletionToolParam{
{
Type: openai.F("function"),
Function: openai.FunctionDefinitionParam{
Name: openai.String("get_weather"),
Description: openai.String("获取城市天气"),
Parameters: openai.F(map[string]any{
"type": "object",
"properties": map[string]any{
"city": map[string]any{
"type": "string",
"description": "城市名称",
},
},
"required": []string{"city"},
}),
},
},
}
resp, err := client.Chat.Completions.New(context.TODO(), openai.ChatCompletionNewParams{
Model: openai.F("gpt-4o"),
Messages: []openai.ChatCompletionMessageParamUnion{
openai.ChatCompletionUserMessageParam{
Content: openai.ChatCompletionUserMessageContent{
Text: openai.String("北京天气怎么样?"),
},
},
},
Tools: openai.F(tools),
})
if err != nil {
panic(err)
}
for _, choice := range resp.Choices {
if choice.Message.ToolCalls != nil {
for _, call := range choice.Message.ToolCalls {
fmt.Printf("调用函数: %s\n", call.Function.Name)
var params WeatherParams
json.Unmarshal([]byte(call.Function.Arguments), ¶ms)
fmt.Printf("参数: %+v\n", params)
}
}
}
}
package main
import (
"context"
"fmt"
"github.com/openai/openai-go"
)
func main() {
client := openai.NewClient(openai.WithAPIKey("your-api-key"))
resp, err := client.Embeddings.New(context.TODO(), openai.EmbeddingNewParams{
Model: openai.F("text-embedding-3-small"),
Input: []string{
"人工智能改变世界",
"机器学习是AI的核心",
"深度学习应用广泛",
},
})
if err != nil {
panic(err)
}
for i, data := range resp.Data {
fmt.Printf("文本 %d 向量维度: %d\n", i+1, len(data.Embedding))
}
// 计算余弦相似度
vec1 := resp.Data[0].Embedding
vec2 := resp.Data[1].Embedding
var dot, norm1, norm2 float64
for i := range vec1 {
dot += vec1[i] * vec2[i]
norm1 += vec1[i] * vec1[i]
norm2 += vec2[i] * vec2[i]
}
similarity := dot / (sqrt(norm1) * sqrt(norm2))
fmt.Printf("余弦相似度: %.4f\n", similarity)
}
package main
import (
"context"
"fmt"
"github.com/openai/openai-go"
)
func main() {
client := openai.NewClient(openai.WithAPIKey("your-api-key"))
// 生成图像
resp, err := client.Images.Generate(context.TODO(), openai.ImageGenerateParams{
Model: openai.F("dall-e-3"),
Prompt: openai.String("一只可爱的小猫坐在花园里,阳光明媚"),
Size: openai.F("1024x1024"),
Quality: openai.F("standard"),
N: openai.Int(1),
})
if err != nil {
panic(err)
}
fmt.Printf("生成的图像: %s\n", resp.Data[0].URL)
}
package main
import (
"context"
"fmt"
"os"
"github.com/openai/openai-go"
)
func main() {
client := openai.NewClient(openai.WithAPIKey("your-api-key"))
// 语音转文字 (Whisper)
file, _ := os.Open("speech.mp3")
defer file.Close()
resp, err := client.Audio.Transcriptions.New(context.TODO(), openai.AudioTranscriptionNewParams{
Model: openai.F("whisper-1"),
File: file,
Filename: openai.String("speech.mp3"),
})
if err != nil {
panic(err)
}
fmt.Printf("转写结果: %s\n", resp.Text)
// 文字转语音 (TTS)
speech, err := client.Audio.Speech.New(context.TODO(), openai.AudioSpeechNewParams{
Model: openai.F("tts-1"),
Voice: openai.F("alloy"),
Input: openai.String("你好,这是语音合成的示例"),
})
if err != nil {
panic(err)
}
speech.WriteToFile("output.mp3")
}
package main
import (
"context"
"fmt"
"github.com/openai/openai-go"
)
func main() {
client := openai.NewClient(openai.WithAPIKey("your-api-key"))
// 创建助手
assistant, err := client.Beta.Assistants.New(context.TODO(), openai.AssistantNewParams{
Name: openai.String("数学老师"),
Model: openai.F("gpt-4o"),
Instructions: openai.String("你是一个数学老师"),
Tools: []openai.AssistantToolParam{
{Type: openai.F(openai.AssistantToolTypeCodeInterpreter)},
},
})
if err != nil {
panic(err)
}
fmt.Printf("助手 ID: %s\n", assistant.ID)
// 创建线程
thread, _ := client.Beta.Threads.New(context.TODO(), openai.ThreadNewParams{})
// 添加消息
_, _ = client.Beta.Threads.Messages.New(context.TODO(), openai.ThreadMessageNewParams{
ThreadID: openai.String(thread.ID),
Role: openai.F(openai.ThreadMessageRoleUser),
Content: openai.F([]openai.ThreadMessageNewParamsContentUnion{
openai.ThreadMessageNewParamsContent{
Text: openai.ThreadMessageTextContentParam{
Text: openai.String("勾股定理是什么?"),
},
},
}),
})
// 运行助手
run, _ := client.Beta.Threads.Runs.New(context.TODO(), openai.ThreadRunNewParams{
ThreadID: openai.String(thread.ID),
AssistantID: openai.String(assistant.ID),
})
// 获取运行结果
run, _ = client.Beta.Threads.Runs.Get(context.TODO(), thread.ID, run.ID)
fmt.Printf("运行状态: %s\n", run.Status)
}
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI(api_key="your-api-key")
async def call_model(model, prompt):
response = await client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
return model, response.choices[0].message.content
async def main():
prompts = [
("gpt-4o", "用一句话介绍AI"),
("claude-3-5-sonnet", "用一句话介绍AI"),
("gemini-1.5-pro", "用一句话介绍AI"),
]
tasks = [call_model(model, prompt) for model, prompt in prompts]
results = await asyncio.gather(*tasks)
for model, response in results:
print(f"{model}: {response}")
asyncio.run(main())
import time
from openai import OpenAI, RateLimitError, APIError
client = OpenAI(api_key="your-api-key")
def make_request_with_retry(prompt, max_retries=3):
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
except RateLimitError as e:
wait_time = (2 ** attempt) * 10 # 指数退避
print(f"速率限制,等待 {wait_time} 秒...")
time.sleep(wait_time)
except APIError as e:
if e.status_code >= 500:
wait_time = (2 ** attempt) * 5
print(f"服务器错误,等待 {wait_time} 秒...")
time.sleep(wait_time)
else:
raise e
raise Exception(f"重试 {max_retries} 次后仍失败")
import tiktoken
from openai import OpenAI
client = OpenAI(api_key="your-api-key")
PRICING = {
"gpt-4o": {"input": 5.0, "output": 15.0}, # 每百万 token
"gpt-4o-mini": {"input": 0.15, "output": 0.6},
"gpt-3.5-turbo": {"input": 0.5, "output": 1.5},
}
def count_tokens(text, model="gpt-4o"):
encoder = tiktoken.encoding_for_model(model)
return len(encoder.encode(text))
def estimate_cost(prompt, response, model="gpt-4o"):
prompt_tokens = count_tokens(prompt, model)
completion_tokens = count_tokens(response, model)
pricing = PRICING.get(model, {"input": 0, "output": 0})
input_cost = (prompt_tokens / 1_000_000) * pricing["input"]
output_cost = (completion_tokens / 1_000_000) * pricing["output"]
return {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens,
"cost": input_cost + output_cost
}
# 使用示例
prompt = "你好,请介绍一下你自己"
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
content = response.choices[0].message.content
cost_info = estimate_cost(prompt, content, "gpt-4o-mini")
print(f"消耗 Token: {cost_info['total_tokens']}")
print(f"预估成本: ${cost_info['cost']:.6f}")
from openai import OpenAI
# 通义千问 API
client = OpenAI(
api_key="your-dashscope-api-key",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
# 使用通义千问模型
response = client.chat.completions.create(
model="qwen-turbo", # qwen-turbo, qwen-plus, qwen-max
messages=[{"role": "user", "content": "你好"}],
max_tokens=1000
)
print(response.choices[0].message.content)
# 调用通义千问的联网搜索功能
response = client.chat.completions.create(
model="qwen-turbo",
messages=[{"role": "user", "content": "今天北京的天气怎么样?"}],
extra_body={
"enable_search": True # 启用联网搜索
}
)
from openai import OpenAI
# 文心一言 API
client = OpenAI(
api_key="your-wenxin-api-key",
base_url="https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat"
)
# 注意:百度需要额外配置 access_token
import requests
# 获取 access_token
def get_baidu_token(api_key, secret_key):
url = "https://aip.baidubce.com/oauth/2.0/token"
params = {
"grant_type": "client_credentials",
"client_id": api_key,
"client_secret": secret_key
}
response = requests.post(url, params=params)
return response.json().get("access_token")
# 调用文心一言
access_token = get_baidu_token("your-api-key", "your-secret-key")
client = OpenAI(
api_key=access_token,
base_url="https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat"
)
response = client.chat.completions.create(
model="ernie-4.5-8k",
messages=[{"role": "user", "content": "你好"}]
)
from openai import OpenAI
# 智谱清言 API
client = OpenAI(
api_key="your-zhipu-api-key",
base_url="https://open.bigmodel.cn/api/paas/v4"
)
# 使用智谱 GLM 系列模型
response = client.chat.completions.create(
model="glm-4", # glm-4, glm-4v, glm-3-turbo
messages=[
{"role": "system", "content": "你是智谱清言,一个乐于助人的助手"},
{"role": "user", "content": "你好,请介绍一下自己"}
],
temperature=0.7,
max_tokens=2000
)
print(response.choices[0].message.content)
from openai import OpenAI
# DeepSeek API
client = OpenAI(
api_key="your-deepseek-api-key",
base_url="https://api.deepseek.com"
)
# 使用 DeepSeek 模型
response = client.chat.completions.create(
model="deepseek-chat", # deepseek-chat, deepseek-reasoner
messages=[{"role": "user", "content": "你好,请介绍一下 DeepSeek"}],
max_tokens=4096,
temperature=0.7
)
print(response.choices[0].message.content)
# DeepSeek Reasoner(推理增强模型)
response = client.chat.completions.create(
model="deepseek-reasoner",
messages=[{"role": "user", "content": "如果a+b=10,a-b=2,那么a和b分别是多少?"}]
)
print(f"推理结果: {response.choices[0].message.content}")
print(f"推理过程: {response.choices[0].message.reasoning_content}")
from openai import OpenAI
# 腾讯混元 API
client = OpenAI(
api_key="your-hunyuan-api-key",
base_url="https://hunyuan.cn-shanghai..tencentcloudapi.com"
)
# 使用混元模型
response = client.chat.completions.create(
model="hunyuan", # hunyuan, hunyuan-vision
messages=[{"role": "user", "content": "你好"}],
temperature=0.7,
max_tokens=2048
)
print(response.choices[0].message.content)