把python代码转为Java, 涉及功能主要是音频转文本

This commit is contained in:
2026-05-02 23:05:30 +08:00
parent 9b791fabd1
commit 7b2691b98f
12 changed files with 519 additions and 1 deletions

View File

@@ -10,7 +10,7 @@
</parent>
<groupId>com.cst</groupId>
<artifactId>AIDriverEEBackend</artifactId>
<version>1.26040419.6-SNAPSHOT</version>
<version>1.260502.1-SNAPSHOT</version>
<name>Langchain4j-rj</name>
<description>Langchain4j-rj20250803</description>
<url/>

View File

@@ -1,8 +1,10 @@
package com.rj;
import com.rj.config.YihangyiVllmAsrProperties;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
@@ -37,6 +39,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
@MapperScan("com.rj.mapper")
@SpringBootApplication
@EnableScheduling
@EnableConfigurationProperties(YihangyiVllmAsrProperties.class)
public class AISmartCard20251230Application {
public static void main(String[] args) {

View File

@@ -0,0 +1,28 @@
package com.rj.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
/**
* 显式扩大 {@code @Scheduled} 线程池,避免单个耗时任务(如长时间 HTTP占满默认单线程后拖死全部定时任务。
* <p>与 {@code spring.task.scheduling.pool.size} 配置互补;此处代码保证至少 8 个调度线程。
*/
@Configuration
public class AppSchedulingConfiguration implements SchedulingConfigurer {
private static final int SCHEDULER_POOL_SIZE = 8;
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(SCHEDULER_POOL_SIZE);
scheduler.setThreadNamePrefix("app-scheduling-");
scheduler.setWaitForTasksToCompleteOnShutdown(true);
scheduler.setAwaitTerminationSeconds(120);
scheduler.setRemoveOnCancelPolicy(true);
scheduler.initialize();
taskRegistrar.setTaskScheduler(scheduler);
}
}

View File

@@ -0,0 +1,23 @@
package com.rj.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
/**
* 说明 {@link com.rj.scheduler.YihangyiVllmAsrScheduler} 的加载条件,避免与根节点 {@code asr.*} 混淆。
*/
@Slf4j
@Component
@ConditionalOnProperty(name = "app.audio.upload.yihangyi.asr.enabled", havingValue = "false", matchIfMissing = true)
public class YihangyiVllmAsrEnablementListener {
@EventListener(ApplicationReadyEvent.class)
public void onReady() {
log.info(
"YihangyiVllmAsrScheduler 未启用:需设置 app.audio.upload.yihangyi.asr.enabled=true当前为 false 或未配置)。"
+ "根配置 asr.enabled 用于其它 ASR 能力,不会加载本调度类。");
}
}

View File

@@ -0,0 +1,11 @@
package com.rj.config;
/**
* yihangyi 目录轮询 ASR单线程顺序处理或线程池并行处理本批文件。
*/
public enum YihangyiVllmAsrExecutorMode {
/** 同批文件逐个处理(与 Python 脚本一致)。 */
SINGLE,
/** 同批文件提交到固定大小线程池并行处理。 */
POOL
}

View File

@@ -0,0 +1,42 @@
package com.rj.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* vLLM OpenAI 兼容 ASR 轮询配置(对应 Python {@code audio_toText_qwen3_asr_17b_vllm_interface.py})。
*/
@Data
@ConfigurationProperties(prefix = "app.audio.upload.yihangyi.asr")
public class YihangyiVllmAsrProperties {
/** 是否启用定时轮询(默认关闭,避免本地/Windows 误扫生产目录)。 */
private boolean enabled = false;
private String watchDir = "/home/lizh/java_env/AIDriverEEBackend/audio/yihangyi";
private String doneDir = "/home/lizh/java_env/AIDriverEEBackend/audio/yihangyi_finish";
/** 转写 txt 输出目录,与 {@link com.rj.scheduler.AudioStatisticsScheduler} 扫描目录一致。 */
private String txtDir = "/home/lizh/java_env/AIDriverEEBackend/audio/yihangyi_txt";
/** OpenAI 兼容 base-url须含 /v1例如 {@code http://host:17001/v1} */
private String baseUrl = "http://101.35.52.237:17001/v1";
private String apiKey = "EMPTY";
private String model = "Qwen3-ASR-1.7B";
/** 与 Python POLL_INTERVAL_SEC 一致(毫秒)。 */
private long pollIntervalMs = 50000L;
private int maxBatch = 5;
private YihangyiVllmAsrExecutorMode executorMode = YihangyiVllmAsrExecutorMode.SINGLE;
/** executorMode=POOL 时的工作线程数。 */
private int poolSize = 4;
/** 是否在 Windows 上跳过执行(与其它 yihangyi 定时任务一致)。 */
private boolean skipOnWindows = true;
}

View File

@@ -45,6 +45,14 @@ public interface AudioManagementSegmentsMapper extends BaseMapper<AudioManagemen
@Param("recordingText") String recordingText,
@Param("audioFileOriginalName") String audioFileOriginalName);
/** 音频移至完成目录后,按原始文件名回写磁盘路径(与外部 ASR 脚本行为一致)。 */
@InterceptorIgnore(tenantLine = "true")
@Update("UPDATE audio_management_segments SET audio_file_path = #{audioFilePath} "
+ "WHERE audio_file_original_name = #{audioFileOriginalName}")
int updateAudioFilePathByAudioFileOriginalNameIgnoreTenant(
@Param("audioFilePath") String audioFilePath,
@Param("audioFileOriginalName") String audioFileOriginalName);
@InterceptorIgnore(tenantLine = "true")
@Select("SELECT parent_id FROM audio_management_segments WHERE audio_file_original_name = #{audioFileOriginalName} "
+ "LIMIT 1")

View File

@@ -80,6 +80,7 @@ public class AudioStatisticsScheduler {
return;
}
if (System.getProperty("os.name", "").toLowerCase(Locale.ROOT).contains("win")) {
log.info("syncYihangyiTranscriptTextFiles 跳过Windows 环境(该任务仅在 Linux 服务器访问 yihangyi 目录)");
return;
}
Path scanDir = Paths.get(yihangyiTxtScanDir);
@@ -190,6 +191,7 @@ public class AudioStatisticsScheduler {
return;
}
if (System.getProperty("os.name", "").toLowerCase(Locale.ROOT).contains("win")) {
log.info("uploadLocalSegmentFilesToMinio 跳过Windows 环境(该任务仅在 Linux 服务器访问本地分段路径)");
return;
}
try {

View File

@@ -0,0 +1,153 @@
package com.rj.scheduler;
import com.rj.config.AppConfig;
import com.rj.config.YihangyiVllmAsrExecutorMode;
import com.rj.config.YihangyiVllmAsrProperties;
import com.rj.service.YihangyiVllmAsrService;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import java.nio.file.Path;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* yihangyi 音频 ASR 轮询。<strong>不使用 Spring {@code @Scheduled}</strong>,避免与全局定时线程池争抢;
* 在独立单线程上按固定间隔执行(与 Python 常驻循环等价),批内仍可选 POOL 并行转写。
*/
@Slf4j
@Component
@RequiredArgsConstructor
@ConditionalOnProperty(name = "app.audio.upload.yihangyi.asr.enabled", havingValue = "true")
public class YihangyiVllmAsrScheduler {
private final YihangyiVllmAsrProperties properties;
private final YihangyiVllmAsrService yihangyiVllmAsrService;
private final AppConfig appConfig;
private ExecutorService workerPool;
private ScheduledExecutorService pollExecutor;
@PostConstruct
void start() {
if (properties.getExecutorMode() == YihangyiVllmAsrExecutorMode.POOL) {
int n = Math.max(1, properties.getPoolSize());
workerPool = Executors.newFixedThreadPool(n, r -> {
Thread t = new Thread(r, "yihangyi-asr-worker");
t.setDaemon(true);
return t;
});
log.info("yihangyi ASR 批内使用线程池,线程数={}", n);
} else {
log.info("yihangyi ASR 批内单线程顺序处理");
}
long intervalMs = Math.max(1000L *60*3, properties.getPollIntervalMs());
pollExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "yihangyi-asr-poll");
t.setDaemon(true);
return t;
});
pollExecutor.scheduleWithFixedDelay(this::pollSafe, 0, intervalMs, TimeUnit.MILLISECONDS);
log.info("yihangyi ASR 独立轮询线程已启动fixedDelay={}ms不占 Spring @Scheduled 线程池)", intervalMs);
}
@PreDestroy
void shutdown() {
if (pollExecutor != null) {
pollExecutor.shutdown();
try {
if (!pollExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
pollExecutor.shutdownNow();
}
} catch (InterruptedException e) {
pollExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}
if (workerPool != null) {
workerPool.shutdown();
try {
if (!workerPool.awaitTermination(60, TimeUnit.SECONDS)) {
workerPool.shutdownNow();
}
} catch (InterruptedException e) {
workerPool.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
private void pollSafe() {
try {
log.info("yihangyi ASR 音频-----》文本, 轮询开始");
pollAndTranscribe();
} catch (Throwable t) {
log.error("yihangyi ASR 轮询未捕获异常", t);
}
}
void pollAndTranscribe() {
if (!appConfig.getScheduler().isStart()) {
log.info("yihangyi ASR 调度跳过app.scheduler.start=false");
return;
}
if (properties.isSkipOnWindows()
&& System.getProperty("os.name", "").toLowerCase(Locale.ROOT).contains("win")) {
log.info("yihangyi ASR 调度跳过:当前为 Windows 且 app.audio.upload.yihangyi.asr.skip-on-windows=true");
return;
}
Path watchDir = Path.of(properties.getWatchDir());
int maxBatch = Math.max(1, properties.getMaxBatch());
long t0 = System.nanoTime();
log.info(
"yihangyi ASR 调度开始watchDir={} executorMode={} maxBatch={} pollIntervalMs={}",
watchDir.toAbsolutePath(),
properties.getExecutorMode(),
maxBatch,
properties.getPollIntervalMs());
if (!java.nio.file.Files.isDirectory(watchDir)) {
log.warn("yihangyi ASR 调度结束(异常):监视目录不存在或不是文件夹: {}", watchDir.toAbsolutePath());
return;
}
int pendingTotal = yihangyiVllmAsrService.countPendingAudio(watchDir);
log.info("yihangyi ASR 待转写音频文件总数: {}", pendingTotal);
List<Path> pending = yihangyiVllmAsrService.listNewestPendingAudio(watchDir, maxBatch);
if (pending.isEmpty()) {
log.info(
"yihangyi ASR 调度结束:本批无待处理文件,待转写总数={},耗时={}ms",
pendingTotal,
(System.nanoTime() - t0) / 1_000_000);
return;
}
log.info("yihangyi ASR 本批将处理 {} 个文件: {}", pending.size(),
pending.stream().map(p -> p.getFileName().toString()).toList());
if (properties.getExecutorMode() == YihangyiVllmAsrExecutorMode.POOL && workerPool != null) {
List<CompletableFuture<Void>> futures = pending.stream()
.map(p -> CompletableFuture.runAsync(() -> yihangyiVllmAsrService.processOneFile(p), workerPool))
.toList();
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).join();
} else {
for (Path p : pending) {
yihangyiVllmAsrService.processOneFile(p);
}
}
log.info(
"yihangyi ASR 调度结束:本批已处理 {} 个文件,待转写总数(处理前统计)={},耗时={}ms",
pending.size(),
pendingTotal,
(System.nanoTime() - t0) / 1_000_000);
}
}

View File

@@ -0,0 +1,221 @@
package com.rj.service;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rj.config.YihangyiVllmAsrProperties;
import com.rj.mapper.AudioManagementSegmentsMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.io.FileSystemResource;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.stream.Stream;
/**
* 监视目录 → 重命名为 *processing* → 调用 OpenAI 兼容 /v1/audio/transcriptions → 写 txt → 移至完成目录 → 更新 {@code audio_file_path}。
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class YihangyiVllmAsrService {
private static final Set<String> AUDIO_EXTENSIONS = Set.of(
".mp3", ".wav", ".m4a", ".flac", ".ogg", ".wma", ".aac", ".webm", ".opus"
);
private final YihangyiVllmAsrProperties properties;
private final RestTemplate restTemplate;
private final ObjectMapper objectMapper;
private final AudioManagementSegmentsMapper audioManagementSegmentsMapper;
public int countPendingAudio(Path watchDir) {
if (!Files.isDirectory(watchDir)) {
return 0;
}
try (Stream<Path> stream = Files.list(watchDir)) {
return (int) stream.filter(Files::isRegularFile).filter(YihangyiVllmAsrService::isPendingAudioFile).count();
} catch (IOException e) {
log.warn("统计待转写文件失败: {}", watchDir.toAbsolutePath(), e);
return 0;
}
}
/**
* 按修改时间从新到旧,最多 {@code limit} 条。
*/
public List<Path> listNewestPendingAudio(Path watchDir, int limit) {
if (!Files.isDirectory(watchDir) || limit <= 0) {
return List.of();
}
List<PathWithMtime> entries = new ArrayList<>();
try (Stream<Path> stream = Files.list(watchDir)) {
for (Path p : stream.filter(Files::isRegularFile).toList()) {
if (!isPendingAudioFile(p)) {
continue;
}
try {
long mtime = Files.getLastModifiedTime(p).toMillis();
entries.add(new PathWithMtime(p, mtime));
} catch (IOException ignored) {
// skip
}
}
} catch (IOException e) {
log.warn("列出待转写文件失败: {}", watchDir.toAbsolutePath(), e);
return List.of();
}
entries.sort(Comparator.comparingLong(PathWithMtime::mtime).reversed());
return entries.stream().limit(limit).map(PathWithMtime::path).toList();
}
/**
* 处理单个原始路径(调用前须已确认为待处理音频)。失败时打日志;转写失败则文件保持 *processing* 名(与 Python 一致)。
*/
public void processOneFile(Path src) {
Path watchDir = src.getParent();
if (watchDir == null) {
log.warn("[skip] 无父目录: {}", src);
return;
}
Path dst = processingPath(src);
try {
Files.move(src, dst, StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
log.warn("[skip] 重命名失败 {} -> {}: {}", src, dst, e.getMessage());
return;
}
long t0 = System.nanoTime();
try {
String text = transcribe(dst);
double elapsedSec = (System.nanoTime() - t0) / 1_000_000_000.0;
log.info("ASR 完成 file={} 耗时={}s 字数={}", dst.getFileName(), String.format(Locale.ROOT, "%.3f", elapsedSec),
text != null ? text.length() : 0);
if (log.isDebugEnabled()) {
log.debug("ASR 文本: {}", text);
}
Path txtDir = Path.of(properties.getTxtDir());
try {
writeTranscriptionTxt(txtDir, src, text != null ? text : "");
} catch (IOException e) {
log.error("[error] 转写成功但写入文本失败: {}", txtDir.resolve(txtFileName(src)), e);
}
Path doneDir = Path.of(properties.getDoneDir());
try {
Files.createDirectories(doneDir);
Path finalPath = doneDir.resolve(src.getFileName());
Files.move(dst, finalPath, StandardCopyOption.REPLACE_EXISTING);
String abs = finalPath.toAbsolutePath().normalize().toString();
int n = audioManagementSegmentsMapper.updateAudioFilePathByAudioFileOriginalNameIgnoreTenant(
abs, src.getFileName().toString());
if (n == 0) {
log.warn("[warn] 未找到 audio_file_original_name={} 的记录audio_file_path 未更新",
src.getFileName());
} else {
log.info("已更新 audio_file_path{} 行): {} -> {}", n, src.getFileName(), abs);
}
} catch (IOException e) {
log.error("[error] 转写成功但移动失败 {} -> {}", dst, doneDir.resolve(src.getFileName()), e);
}
} catch (Exception e) {
log.error("[error] 转写失败 {}", dst, e);
}
}
private record PathWithMtime(Path path, long mtime) {}
public static boolean isPendingAudioFile(Path path) {
if (!Files.isRegularFile(path)) {
return false;
}
String name = path.getFileName().toString();
String lower = name.toLowerCase(Locale.ROOT);
int dot = lower.lastIndexOf('.');
String suf = dot < 0 ? "" : lower.substring(dot);
if (!AUDIO_EXTENSIONS.contains(suf)) {
return false;
}
return !lower.contains("processing");
}
public static Path processingPath(Path path) {
String stem = fileStem(path.getFileName().toString());
String extWithDot = fileExtensionWithDot(path.getFileName().toString());
return path.resolveSibling(stem + "processing" + extWithDot);
}
private static String fileStem(String fileName) {
int dot = fileName.lastIndexOf('.');
return dot < 0 ? fileName : fileName.substring(0, dot);
}
private static String fileExtensionWithDot(String fileName) {
int dot = fileName.lastIndexOf('.');
return dot < 0 ? "" : fileName.substring(dot);
}
private static String txtFileName(Path originalAudio) {
return fileStem(originalAudio.getFileName().toString()) + ".txt";
}
private void writeTranscriptionTxt(Path txtDir, Path originalAudio, String text) throws IOException {
Files.createDirectories(txtDir);
Path out = txtDir.resolve(txtFileName(originalAudio));
Files.writeString(out, text, StandardCharsets.UTF_8);
}
private String transcribe(Path audioPath) throws IOException {
String base = properties.getBaseUrl().trim();
if (base.endsWith("/")) {
base = base.substring(0, base.length() - 1);
}
String url = base + "/audio/transcriptions";
MultiValueMap<String, Object> body = new LinkedMultiValueMap<>();
body.add("model", properties.getModel());
body.add("file", new FileSystemResource(audioPath.toFile()));
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.MULTIPART_FORM_DATA);
headers.setBearerAuth(properties.getApiKey() != null ? properties.getApiKey() : "");
HttpEntity<MultiValueMap<String, Object>> request = new HttpEntity<>(body, headers);
try {
ResponseEntity<String> response = restTemplate.postForEntity(url, request, String.class);
if (!response.getStatusCode().is2xxSuccessful() || response.getBody() == null) {
throw new IOException("ASR HTTP 非成功: " + response.getStatusCode() + " body=" + response.getBody());
}
return parseTranscriptionText(response.getBody());
} catch (RestClientException e) {
throw new IOException("ASR 请求失败: " + e.getMessage(), e);
}
}
private String parseTranscriptionText(String json) throws IOException {
JsonNode root = objectMapper.readTree(json);
JsonNode text = root.get("text");
if (text != null && !text.isNull()) {
return text.asText("");
}
throw new IOException("响应 JSON 缺少 text 字段: " + json);
}
}

View File

@@ -10,6 +10,21 @@ app:
# 外部 ASR 写入的转写文本目录(与 AudioStatisticsScheduler.syncYihangyiTranscriptTextFiles 一致)
txt-scan-dir: /home/lizh/java_env/AIDriverEEBackend/audio/yihangyi_txt
txt-finish-dir: /home/lizh/java_env/AIDriverEEBackend/audio/yihangyi_txt_finish
# 内置 Java 轮询 ASR对应 Python audio_toText_qwen3_asr_17b_vllm_interface.py默认关闭
asr:
enabled: false
watch-dir: /home/lizh/java_env/AIDriverEEBackend/audio/yihangyi
done-dir: /home/lizh/java_env/AIDriverEEBackend/audio/yihangyi_finish
txt-dir: /home/lizh/java_env/AIDriverEEBackend/audio/yihangyi_txt
base-url: http://101.35.52.237:17001/v1
api-key: ${OPENAI_API_KEY:EMPTY}
model: Qwen3-ASR-1.7B
poll-interval-ms: 5000
max-batch: 5
# SINGLE每批内单线程顺序POOL固定线程池并行处理本批
executor-mode: SINGLE
pool-size: 4
skip-on-windows: true
# 音频文件访问URL前缀
access:
url: /api/audio/

View File

@@ -5,6 +5,12 @@ app:
timezone: Asia/Shanghai
scheduler:
start: true
# yihangyi 目录轮询 ASRYihangyiVllmAsrScheduler仅当 asr.enabled=true 时才会创建 Bean 并输出日志;与下文根节点 asr.* 不是同一套配置
audio:
upload:
yihangyi:
asr:
enabled: true
# DashScope API配置
dashscope:
api:
@@ -86,6 +92,12 @@ spring:
autoconfigure:
exclude:
- dev.langchain4j.community.store.embedding.redis.spring.RedisEmbeddingStoreAutoConfiguration
# 默认定时任务线程池为 1长时间阻塞的 @Scheduled如 ASR HTTP会卡住其余全部调度任务
task:
scheduling:
pool:
size: 8
thread-name-prefix: app-scheduling-
data:
redis:
port: 6389