LLM 应用示例:最佳实践示例
LLM 应用开发模式:轻量级 API 编排
在 LangChain 中使用了思维链的方式来选择合适的智能体(Agent),在 Co-mate 中,我们也是采取了类似的设计,在本地构建好函数,然后交由 LLM 来分析用户的输入适合调用哪个函数。
如下是我们的 prompt 示例:
Answer the following questions as best you can. You have access to the following tools:
introduce_system: introduce_system is a function to introduce a system.
Use the following format:
Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [introduce_system]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question
Begin!
Question: Introduce the following system: https://github.com/archguard/ddd-monolithic-code-sample
这里的 Question
便是用户的输入,然后再调用对应的 introduce_system
函数进行分析。
LLM 应用开发模式:DSL 动态运行时
与事实能力相比,我们更信任 LLM 的编排能力,因此我们在 Co-mate 中采用了 DSL 的方式来编排函数,这样可以更加灵活的编排函数。
为了支撑这样的能力,我们在 Co-mate 中引入了 Kotlin 作为 DSL 的运行时:
// 初始化运行时
val repl = KotlinInterpreter()
val mvcDslSpec = repl.evalCast<FoundationSpec>(InterpreterRequest(code = mvcFoundation))
// 从用户的输入中获取 action
val action = ComateToolingAction.from(action.lowercase())
// 添加默认的 DSL spec
if (action == ComateToolingAction.FOUNDATION_SPEC_GOVERNANCE) {
comateContext.spec = mvcDslSpec
}
对应的 DSL 示例(由 ChatGPT 根据 DDD 版本 spec 生成):
foundation {
project_name {
pattern("^([a-z0-9-]+)-([a-z0-9-]+)(-common)?\${'$'}")
example("system1-webapp1")
}
layered {
layer("controller") {
pattern(".*\\.controller") { name shouldBe endsWith("Controller") }
}
layer("service") {
pattern(".*\\.service") {
name shouldBe endsWith("DTO", "Request", "Response", "Factory", "Service")
}
}
layer("repository") {
pattern(".*\\.repository") { name shouldBe endsWith("Entity", "Repository", "Mapper") }
}
dependency {
"controller" dependedOn "service"
"controller" dependedOn "repository"
"service" dependedOn "repository"
}
}
naming {
class_level {
style("CamelCase")
pattern(".*") { name shouldNotBe contains("${'$'}") }
}
function_level {
style("CamelCase")
pattern(".*") { name shouldNotBe contains("${'$'}") }
}
}
}
LLM 应用开发模式:本地小模型
在 Co-mate 中,我们在本地引入了 SentenceTransformer 来处理用户的输入,优在本地分析、匹配用户的输入,并处理。当匹配到结果后直接调用本地的函数,当匹配不到结果时调用远端的处理函数来处理。
HuggingFace: https://huggingface.co/sentence-transformers
在原理上主要是参考了 GitHub Copilot、 Bloop 的实现,通过本地的小模型来处理用户的输入,然后再通过远端的大模型来处理用户的输入。
Rust 实现示例
Rust 相关示例:https://github.com/unit-mesh/unit-agent
#![allow(unused)] fn main() { pub fn embed(&self, sequence: &str) -> anyhow::Result<Embedding> { let tokenizer_output = self.tokenizer.encode(sequence, true).unwrap(); let input_ids = tokenizer_output.get_ids(); let attention_mask = tokenizer_output.get_attention_mask(); let token_type_ids = tokenizer_output.get_type_ids(); let length = input_ids.len(); trace!("embedding {} tokens {:?}", length, sequence); let inputs_ids_array = ndarray::Array::from_shape_vec( (1, length), input_ids.iter().map(|&x| x as i64).collect(), )?; let attention_mask_array = ndarray::Array::from_shape_vec( (1, length), attention_mask.iter().map(|&x| x as i64).collect(), )?; let token_type_ids_array = ndarray::Array::from_shape_vec( (1, length), token_type_ids.iter().map(|&x| x as i64).collect(), )?; let outputs = self.session.run([ InputTensor::from_array(inputs_ids_array.into_dyn()), InputTensor::from_array(attention_mask_array.into_dyn()), InputTensor::from_array(token_type_ids_array.into_dyn()), ])?; let output_tensor: OrtOwnedTensor<f32, _> = outputs[0].try_extract().unwrap(); let sequence_embedding = &*output_tensor.view(); let pooled = sequence_embedding.mean_axis(Axis(1)).unwrap(); Ok(pooled.to_owned().as_slice().unwrap().to_vec()) } }
Kotlin 实现示例
class Semantic(val tokenizer: HuggingFaceTokenizer, val session: OrtSession, val env: OrtEnvironment) {
fun embed(
sequence: String,
): FloatArray {
val tokenized = tokenizer.encode(sequence, true)
val inputIds = tokenized.ids
val attentionMask = tokenized.attentionMask
val typeIds = tokenized.typeIds
val tensorInput = OrtUtil.reshape(inputIds, longArrayOf(1, inputIds.size.toLong()))
val tensorAttentionMask = OrtUtil.reshape(attentionMask, longArrayOf(1, attentionMask.size.toLong()))
val tensorTypeIds = OrtUtil.reshape(typeIds, longArrayOf(1, typeIds.size.toLong()))
val result = session.run(
mapOf(
"input_ids" to OnnxTensor.createTensor(env, tensorInput),
"attention_mask" to OnnxTensor.createTensor(env, tensorAttentionMask),
"token_type_ids" to OnnxTensor.createTensor(env, tensorTypeIds),
),
)
val outputTensor = result.get(0) as OnnxTensor
val output = outputTensor.floatBuffer.array()
return output
}
companion object {
fun create(): Semantic {
val classLoader = Thread.currentThread().getContextClassLoader()
val tokenizerPath = classLoader.getResource("model/tokenizer.json")!!.toURI()
val onnxPath = classLoader.getResource("model/model.onnx")!!.toURI()
try {
val env: Map<String, String> = HashMap()
val array: List<String> = tokenizerPath.toString().split("!")
FileSystems.newFileSystem(URI.create(array[0]), env)
} catch (e: Exception) {
// e.printStackTrace()
}
val tokenizer = HuggingFaceTokenizer.newInstance(Paths.get(tokenizerPath))
val ortEnv = OrtEnvironment.getEnvironment()
val sessionOptions = OrtSession.SessionOptions()
// load onnxPath as byte[]
val onnxPathAsByteArray = Files.readAllBytes(Paths.get(onnxPath))
val session = ortEnv.createSession(onnxPathAsByteArray, sessionOptions)
return Semantic(tokenizer, session, ortEnv)
}
}
}
LLM 应用开发模式:Stream 封装
服务端 API 调用:Kotlin 实现
机制:结合 callbackFlow 来实现
fun stream(text: String): Flow<String> {
val systemMessage = ChatMessage(ChatMessageRole.USER.value(), text)
messages.add(systemMessage)
val completionRequest = ChatCompletionRequest.builder()
.model(openAiVersion)
.temperature(0.0)
.messages(messages)
.build()
return callbackFlow {
withContext(Dispatchers.IO) {
service.streamChatCompletion(completionRequest)
.doOnError(Throwable::printStackTrace)
.blockingForEach { response ->
val completion = response.choices[0].message
if (completion != null && completion.content != null) {
trySend(completion.content)
}
}
close()
}
}
}
客户端 API 调用:TypeScript 实现
机制:依赖于 Vercel 的 AI 库,提供对于 Stream 的封装
import { Message, OpenAIStream, StreamingTextResponse } from 'ai'
import { Configuration, OpenAIApi } from 'openai-edge'
export async function stream(apiKey: string, messages: Message[], isStream: boolean = true) {
let basePath = process.env.OPENAI_PROXY_URL
if (basePath == null) {
basePath = 'https://api.openai.com'
}
const configuration = new Configuration({
apiKey: apiKey || process.env.OPENAI_API_KEY,
basePath
})
const openai = new OpenAIApi(configuration)
const res = await openai.createChatCompletion({
model: 'gpt-3.5-turbo',
messages,
temperature: 0.7,
stream: isStream
})
if (!isStream) {
return res
}
const stream = OpenAIStream(res, {})
return new StreamingTextResponse(stream)
}
客户端 UI 实现:Fetch
const decoder = new TextDecoder()
export function decodeAIStreamChunk(chunk: Uint8Array): string {
return decoder.decode(chunk)
}
await fetch("/api/action/tooling", {
method: "POST",
body: JSON.stringify(tooling),
}).then(async response => {
onResult(await response.json())
let result = ""
const reader = response.body.getReader()
while (true) {
const { done, value } = await reader.read()
if (done) {
break
}
result += decodeAIStreamChunk(value)
onResult(result)
}
isPending = false
});
服务端实现转发: Java + Spring
WebFlux + Spring Boot
@RestController
public class ChatController {
private WebClient webClient = WebClient.create();
@PostMapping(value = "/api/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter chat(@RequestBody ChatInput input) throws IOException {
SseEmitter emitter = new SseEmitter();
webClient.post()
.uri(REMOTE_URL)
.bodyValue(input)
.exchangeToFlux(response -> {
if (response.statusCode().is2xxSuccessful()) {
return response.bodyToFlux(byte[].class)
.map(String::new)
.doOnNext(string -> {
try {
emitter.send(string);
} catch (IOException e) {
logger.error("Error while sending data: {}", e.getMessage());
emitter.completeWithError(e);
}
})
.doOnComplete(emitter::complete)
.doOnError(emitter::completeWithError);
} else {
emitter.completeWithError(new RuntimeException("Error while calling remote service"));
}
})
.subscribe();
return emitter;
}
}
服务端转发:Python
FastAPI + OpenAI
def generate_reply_stream(input_data: ChatInput):
prompt = input_data.message
try:
prompt = prompt
response = openai.ChatCompletion.create(
model=openai_model,
temperature=temperature,
max_tokens=max_tokens,
n=max_responses,
messages=[
{"role": "user", "content": prompt},
],
stream=True,
)
except Exception as e:
print("Error in creating campaigns from openAI:", str(e))
raise HTTPException(503, error503)
try:
for chunk in response:
current_content = chunk["choices"][0]["delta"].get("content", "")
yield current_content
except Exception as e:
print("OpenAI Response (Streaming) Error: " + str(e))
raise HTTPException(503, error503)
@app.post("/api/chat", response_class=Response)
async def chat(input_data: ChatInput):
return StreamingResponse(generate_reply_stream(input_data), media_type="text/event-stream")