feat: support S3 transfer protocol

This commit is contained in:
hstyi
2025-06-25 16:36:48 +08:00
committed by hstyi
parent a2a02c0bad
commit cee0c863f8
24 changed files with 701 additions and 466 deletions

View File

@@ -0,0 +1,52 @@
package app.termora.plugins.s3
import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.attribute.FileTime
data class S3FileAttributes(
private val lastModifiedTime: Long = 0,
private val lastAccessTime: Long = 0,
private val creationTime: Long = 0,
private val regularFile: Boolean = false,
private val directory: Boolean = false,
private val symbolicLink: Boolean = false,
private val other: Boolean = false,
private val size: Long = 0,
) : BasicFileAttributes {
override fun lastModifiedTime(): FileTime {
return FileTime.fromMillis(lastModifiedTime)
}
override fun lastAccessTime(): FileTime {
return FileTime.fromMillis(lastAccessTime)
}
override fun creationTime(): FileTime {
return FileTime.fromMillis(creationTime)
}
override fun isRegularFile(): Boolean {
return regularFile
}
override fun isDirectory(): Boolean {
return directory
}
override fun isSymbolicLink(): Boolean {
return symbolicLink
}
override fun isOther(): Boolean {
return other
}
override fun size(): Long {
return size
}
override fun fileKey(): Any? {
return null
}
}

View File

@@ -1,223 +0,0 @@
package app.termora.plugins.s3
import app.termora.DynamicIcon
import app.termora.Icons
import app.termora.vfs2.FileObjectDescriptor
import io.minio.*
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import org.apache.commons.vfs2.FileObject
import org.apache.commons.vfs2.FileType
import org.apache.commons.vfs2.provider.AbstractFileName
import org.apache.commons.vfs2.provider.AbstractFileObject
import java.io.InputStream
import java.io.OutputStream
import java.io.PipedInputStream
import java.io.PipedOutputStream
class S3FileObject(
private val minio: MinioClient,
fileName: AbstractFileName,
fileSystem: S3FileSystem
) : AbstractFileObject<S3FileSystem>(fileName, fileSystem), FileObjectDescriptor {
private var attributes = Attributes()
init {
attributes = attributes.copy(isRoot = name.path == fileSystem.getDelimiter())
}
override fun doGetContentSize(): Long {
return attributes.size
}
override fun doGetType(): FileType {
return if (attributes.isRoot || attributes.isBucket) FileType.FOLDER
else if (attributes.isDirectory && attributes.isFile) FileType.FILE_OR_FOLDER
else if (attributes.isFile) FileType.FILE
else if (attributes.isDirectory) FileType.FOLDER
else FileType.IMAGINARY
}
override fun doListChildren(): Array<out String?>? {
return null
}
override fun doCreateFolder() {
// Nothing
}
private fun getBucketName(): String {
if (StringUtils.isNotBlank(attributes.bucket)) {
return attributes.bucket
}
if (parent is S3FileObject) {
return (parent as S3FileObject).getBucketName()
}
throw IllegalArgumentException("Bucket must be a S3 file object")
}
override fun doListChildrenResolved(): Array<FileObject>? {
if (isFile) return null
val children = mutableListOf<FileObject>()
if (attributes.isRoot) {
val buckets = minio.listBuckets()
for (bucket in buckets) {
val file = resolveFile(bucket.name())
if (file is S3FileObject) {
file.attributes = file.attributes.copy(
isBucket = true,
bucket = bucket.name(),
isDirectory = false,
isFile = false,
lastModified = bucket.creationDate().toInstant().toEpochMilli()
)
children.add(file)
}
}
} else if (attributes.isBucket || attributes.isDirectory) {
val builder = ListObjectsArgs.builder().bucket(getBucketName())
.delimiter(fileSystem.getDelimiter())
var prefix = StringUtils.EMPTY
if (attributes.isDirectory) {
// remove first delimiter
prefix = StringUtils.removeStart(name.path, fileSystem.getDelimiter())
// remove bucket
prefix = StringUtils.removeStart(prefix, getBucketName())
// remove first delimiter
prefix = StringUtils.removeStart(prefix, fileSystem.getDelimiter())
// remove last delimiter
prefix = StringUtils.removeEnd(prefix, fileSystem.getDelimiter())
prefix = prefix + fileSystem.getDelimiter()
}
builder.prefix(prefix)
for (e in minio.listObjects(builder.build())) {
val item = e.get()
val objectName = StringUtils.removeStart(item.objectName(), prefix)
val file = resolveFile(objectName)
if (file is S3FileObject) {
val lastModified = if (item.lastModified() != null) item.lastModified()
.toInstant().toEpochMilli() else 0
val owner = if (item.owner() != null) item.owner().displayName() else StringUtils.EMPTY
file.attributes = file.attributes.copy(
bucket = attributes.bucket,
isDirectory = item.isDir,
isFile = item.isDir.not(),
lastModified = lastModified,
size = if (item.isDir.not()) item.size() else 0,
owner = owner
)
children.add(file)
}
}
}
return children.toTypedArray()
}
override fun getFileSystem(): S3FileSystem {
return super.getFileSystem() as S3FileSystem
}
override fun doGetLastModifiedTime(): Long {
return attributes.lastModified
}
override fun getIcon(width: Int, height: Int): DynamicIcon? {
if (attributes.isBucket) {
return Icons.dbms
}
return super.getIcon(width, height)
}
override fun getTypeDescription(): String? {
if (attributes.isBucket) {
return "Bucket"
}
return null
}
override fun getLastModified(): Long? {
return attributes.lastModified
}
override fun getOwner(): String? {
return attributes.owner
}
override fun doDelete() {
if (isFile) {
minio.removeObject(
RemoveObjectArgs.builder()
.bucket(getBucketName()).`object`(getObjectName()).build()
)
}
}
override fun doGetOutputStream(bAppend: Boolean): OutputStream? {
return createStreamer()
}
private fun createStreamer(): OutputStream {
val pis = PipedInputStream()
val pos = PipedOutputStream(pis)
val thread = Thread.ofVirtual().start {
minio.putObject(
PutObjectArgs.builder()
.bucket(getBucketName())
.stream(pis, -1, 32 * 1024 * 1024)
.`object`(getObjectName()).build()
)
IOUtils.closeQuietly(pis)
}
return object : OutputStream() {
override fun write(b: Int) {
pos.write(b)
}
override fun close() {
pos.close()
thread.join()
}
}
}
override fun doGetInputStream(bufferSize: Int): InputStream? {
return minio.getObject(GetObjectArgs.builder().bucket(getBucketName()).`object`(getObjectName()).build())
}
private fun getObjectName(): String {
var objectName = StringUtils.removeStart(name.path, fileSystem.getDelimiter())
objectName = StringUtils.removeStart(objectName, getBucketName())
objectName = StringUtils.removeStart(objectName, fileSystem.getDelimiter())
return objectName
}
private data class Attributes(
val isRoot: Boolean = false,
val isBucket: Boolean = false,
val isDirectory: Boolean = false,
val isFile: Boolean = false,
/**
* 只要不是 root 那么一定存在 bucket
*/
val bucket: String = StringUtils.EMPTY,
/**
* 最后修改时间
*/
val lastModified: Long = 0,
/**
* 文件大小
*/
val size: Long = 0,
/**
* 所有者
*/
val owner: String = StringUtils.EMPTY
)
}

View File

@@ -1,47 +0,0 @@
package app.termora.plugins.s3
import io.minio.MinioClient
import org.apache.commons.vfs2.Capability
import org.apache.commons.vfs2.FileName
import org.apache.commons.vfs2.FileSystem
import org.apache.commons.vfs2.FileSystemOptions
import org.apache.commons.vfs2.provider.AbstractOriginatingFileProvider
class S3FileProvider private constructor() : AbstractOriginatingFileProvider() {
companion object {
val instance by lazy { S3FileProvider() }
val capabilities = listOf(
Capability.CREATE,
Capability.DELETE,
Capability.RENAME,
Capability.GET_TYPE,
Capability.LIST_CHILDREN,
Capability.READ_CONTENT,
Capability.WRITE_CONTENT,
Capability.GET_LAST_MODIFIED,
Capability.RANDOM_ACCESS_READ,
)
}
override fun getCapabilities(): Collection<Capability> {
return S3FileProvider.capabilities
}
override fun doCreateFileSystem(
rootFileName: FileName,
options: FileSystemOptions
): FileSystem {
val region = S3FileSystemConfigBuilder.instance.getRegion(options)
val endpoint = S3FileSystemConfigBuilder.instance.getEndpoint(options)
val accessKey = S3FileSystemConfigBuilder.instance.getAccessKey(options)
val secretKey = S3FileSystemConfigBuilder.instance.getSecretKey(options)
val builder = MinioClient.builder()
builder.endpoint(endpoint)
builder.credentials(accessKey, secretKey)
if (region.isNotBlank()) builder.region(region)
return S3FileSystem(builder.build(), rootFileName, options)
}
}

View File

@@ -1,32 +1,44 @@
package app.termora.plugins.s3
import io.minio.MinioClient
import org.apache.commons.vfs2.Capability
import org.apache.commons.vfs2.FileName
import org.apache.commons.vfs2.FileObject
import org.apache.commons.vfs2.FileSystemOptions
import org.apache.commons.vfs2.provider.AbstractFileName
import org.apache.commons.vfs2.provider.AbstractFileSystem
import org.apache.sshd.common.file.util.BaseFileSystem
import java.nio.file.Path
import java.nio.file.attribute.UserPrincipalLookupService
import java.util.concurrent.atomic.AtomicBoolean
class S3FileSystem(
private val minio: MinioClient,
rootName: FileName,
fileSystemOptions: FileSystemOptions
) : AbstractFileSystem(rootName, null, fileSystemOptions) {
private val minioClient: MinioClient,
) : BaseFileSystem<S3Path>(S3FileSystemProvider(minioClient)) {
override fun addCapabilities(caps: MutableCollection<Capability>) {
caps.addAll(S3FileProvider.capabilities)
}
private val isOpen = AtomicBoolean(true)
override fun createFile(name: AbstractFileName): FileObject? {
return S3FileObject(minio, name, this)
}
fun getDelimiter(): String {
return S3FileSystemConfigBuilder.instance.getDelimiter(fileSystemOptions)
override fun create(root: String?, names: List<String>): S3Path {
val path = S3Path(this, root, names)
if (names.isEmpty()) {
path.attributes = path.attributes.copy(directory = true)
}
return path
}
override fun close() {
minio.close()
if (isOpen.compareAndSet(false, true)) {
minioClient.close()
}
}
}
override fun isOpen(): Boolean {
return isOpen.get()
}
override fun getRootDirectories(): Iterable<Path> {
return mutableSetOf<Path>(create(separator))
}
override fun supportedFileAttributeViews(): Set<String> {
TODO("Not yet implemented")
}
override fun getUserPrincipalLookupService(): UserPrincipalLookupService {
throw UnsupportedOperationException()
}
}

View File

@@ -1,14 +0,0 @@
package app.termora.plugins.s3
import app.termora.vfs2.s3.AbstractS3FileSystemConfigBuilder
import org.apache.commons.vfs2.FileSystem
class S3FileSystemConfigBuilder private constructor() : AbstractS3FileSystemConfigBuilder() {
companion object {
val instance by lazy { S3FileSystemConfigBuilder() }
}
override fun getConfigClass(): Class<out FileSystem> {
return S3FileSystem::class.java
}
}

View File

@@ -0,0 +1,299 @@
package app.termora.plugins.s3
import io.minio.*
import io.minio.errors.ErrorResponseException
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import java.io.OutputStream
import java.io.PipedInputStream
import java.io.PipedOutputStream
import java.net.URI
import java.nio.channels.Channels
import java.nio.channels.SeekableByteChannel
import java.nio.file.*
import java.nio.file.attribute.*
import java.nio.file.spi.FileSystemProvider
import kotlin.io.path.absolutePathString
import kotlin.io.path.name
class S3FileSystemProvider(private val minioClient: MinioClient) : FileSystemProvider() {
/**
* 因为 S3 协议不存在文件夹,所以用户新建的文件夹先保存到内存中
*/
private val directories = mutableMapOf<String, MutableList<S3Path>>()
override fun getScheme(): String? {
return "s3"
}
override fun newFileSystem(
uri: URI,
env: Map<String, *>
): FileSystem {
TODO("Not yet implemented")
}
override fun getFileSystem(uri: URI): FileSystem {
TODO("Not yet implemented")
}
override fun getPath(uri: URI): Path {
TODO("Not yet implemented")
}
override fun newByteChannel(
path: Path,
options: Set<OpenOption>,
vararg attrs: FileAttribute<*>
): SeekableByteChannel {
if (path !is S3Path) throw UnsupportedOperationException("path must be a S3Path")
if (options.contains(StandardOpenOption.WRITE)) {
return S3WriteSeekableByteChannel(Channels.newChannel(createStreamer(path)))
} else {
val response = minioClient.getObject(
GetObjectArgs.builder().bucket(path.bucketName)
.`object`(path.objectName).build()
)
return S3ReadSeekableByteChannel(Channels.newChannel(response), stat(path))
}
}
private fun createStreamer(path: S3Path): OutputStream {
val pis = PipedInputStream()
val pos = PipedOutputStream(pis)
val thread = Thread.ofVirtual().start {
minioClient.putObject(
PutObjectArgs.builder()
.bucket(path.bucketName)
.stream(pis, -1, 32 * 1024 * 1024)
.`object`(path.objectName).build()
)
IOUtils.closeQuietly(pis)
}
return object : OutputStream() {
override fun write(b: Int) {
pos.write(b)
}
override fun close() {
pos.close()
thread.join()
}
}
}
override fun newDirectoryStream(
dir: Path,
filter: DirectoryStream.Filter<in Path>
): DirectoryStream<Path> {
return object : DirectoryStream<Path> {
override fun iterator(): MutableIterator<Path> {
return files(dir as S3Path).iterator()
}
override fun close() {
}
}
}
private fun files(path: S3Path): MutableList<S3Path> {
val paths = mutableListOf<S3Path>()
// root
if (path.isRoot) {
for (bucket in minioClient.listBuckets()) {
val p = path.resolve(bucket.name())
p.attributes = S3FileAttributes(
directory = true,
lastModifiedTime = bucket.creationDate().toInstant().toEpochMilli()
)
paths.add(p)
}
return paths
}
var startAfter = StringUtils.EMPTY
val maxKeys = 100
while (true) {
val builder = ListObjectsArgs.builder()
.bucket(path.bucketName)
.maxKeys(maxKeys)
.delimiter(path.fileSystem.separator)
if (path.objectName.isNotBlank()) builder.prefix(path.objectName + path.fileSystem.separator)
if (startAfter.isNotBlank()) builder.startAfter(startAfter)
val subPaths = mutableListOf<S3Path>()
for (e in minioClient.listObjects(builder.build())) {
val item = e.get()
val p = path.bucket.resolve(item.objectName())
var attributes = p.attributes.copy(
directory = item.isDir,
regularFile = item.isDir.not(),
size = item.size()
)
if (item.lastModified() != null) {
attributes = attributes.copy(lastModifiedTime = item.lastModified().toInstant().toEpochMilli())
}
p.attributes = attributes
// 如果是文件夹,那么就要删除内存中的
if (attributes.isDirectory) {
delete(p)
}
subPaths.add(p)
startAfter = item.objectName()
}
paths.addAll(subPaths)
if (subPaths.size < maxKeys)
break
}
paths.addAll(directories[path.absolutePathString()] ?: emptyList())
return paths
}
override fun createDirectory(dir: Path, vararg attrs: FileAttribute<*>) {
synchronized(this) {
if (dir !is S3Path) throw UnsupportedOperationException("dir must be a S3Path")
if (dir.isRoot || dir.isBucket) throw UnsupportedOperationException("No operation permission")
val parent = dir.parent ?: throw UnsupportedOperationException("No operation permission")
directories.computeIfAbsent(parent.absolutePathString()) { mutableListOf() }
.add(dir.apply {
attributes = attributes.copy(directory = true, lastModifiedTime = System.currentTimeMillis())
})
}
}
override fun delete(path: Path) {
if (path !is S3Path) throw UnsupportedOperationException("path must be a S3Path")
if (path.attributes.isDirectory) {
val parent = path.parent
if (parent != null) {
synchronized(this) {
directories[parent.absolutePathString()]?.removeIf { it.name == path.name }
}
}
return
}
minioClient.removeObject(
RemoveObjectArgs.builder().bucket(path.bucketName).`object`(path.objectName).build()
)
}
override fun copy(
source: Path?,
target: Path?,
vararg options: CopyOption?
) {
throw UnsupportedOperationException()
}
override fun move(
source: Path?,
target: Path?,
vararg options: CopyOption?
) {
throw UnsupportedOperationException()
}
override fun isSameFile(path: Path?, path2: Path?): Boolean {
throw UnsupportedOperationException()
}
override fun isHidden(path: Path?): Boolean {
throw UnsupportedOperationException()
}
override fun getFileStore(path: Path?): FileStore? {
throw UnsupportedOperationException()
}
override fun checkAccess(path: Path, vararg modes: AccessMode) {
if (path !is S3Path) throw UnsupportedOperationException("path must be a S3Path")
try {
stat(path)
} catch (e: ErrorResponseException) {
throw NoSuchFileException(e.errorResponse().message())
}
}
private fun stat(path: S3Path): StatObjectResponse {
return minioClient.statObject(
StatObjectArgs.builder()
.`object`(path.objectName)
.bucket(path.bucketName).build()
)
}
override fun <V : FileAttributeView> getFileAttributeView(
path: Path,
type: Class<V>,
vararg options: LinkOption?
): V {
if (path is S3Path) {
return type.cast(object : BasicFileAttributeView {
override fun name(): String {
return "basic"
}
override fun readAttributes(): BasicFileAttributes {
return path.attributes
}
override fun setTimes(
lastModifiedTime: FileTime?,
lastAccessTime: FileTime?,
createTime: FileTime?
) {
throw UnsupportedOperationException()
}
})
}
throw UnsupportedOperationException()
}
override fun <A : BasicFileAttributes> readAttributes(
path: Path,
type: Class<A>,
vararg options: LinkOption
): A {
if (path is S3Path) {
return type.cast(getFileAttributeView(path, BasicFileAttributeView::class.java).readAttributes())
}
throw UnsupportedOperationException()
}
override fun readAttributes(
path: Path?,
attributes: String?,
vararg options: LinkOption?
): Map<String?, Any?>? {
throw UnsupportedOperationException()
}
override fun setAttribute(
path: Path?,
attribute: String?,
value: Any?,
vararg options: LinkOption?
) {
throw UnsupportedOperationException()
}
}

View File

@@ -168,6 +168,7 @@ class S3HostOptionsPane : OptionsPane() {
private fun initView() {
delimiterTextField.text = "/"
delimiterTextField.isEditable = false
add(getCenterComponent(), BorderLayout.CENTER)
}

View File

@@ -0,0 +1,54 @@
package app.termora.plugins.s3
import org.apache.sshd.common.file.util.BasePath
import java.nio.file.LinkOption
import java.nio.file.Path
import kotlin.io.path.absolutePathString
class S3Path(
fileSystem: S3FileSystem,
root: String?,
names: List<String>,
) : BasePath<S3Path, S3FileSystem>(fileSystem, root, names) {
private val separator get() = fileSystem.separator
var attributes = S3FileAttributes()
/**
* 是否是 Bucket
*/
val isBucket get() = parent != null && parent?.parent == null
/**
* 是否是根
*/
val isRoot get() = absolutePathString() == separator
/**
* Bucket Name
*/
val bucketName: String get() = names.first()
/**
* 获取 Bucket
*/
val bucket: S3Path get() = fileSystem.getPath(root, bucketName)
/**
* 获取所在 Bucket 的路径
*/
val objectName: String get() = names.subList(1, names.size).joinToString(separator)
override fun toRealPath(vararg options: LinkOption): Path {
return toAbsolutePath()
}
override fun getParent(): S3Path? {
val path = super.getParent() ?: return null
path.attributes = path.attributes.copy(directory = true)
return path
}
}

View File

@@ -7,9 +7,6 @@ import app.termora.protocol.PathHandlerRequest
import app.termora.protocol.TransferProtocolProvider
import io.minio.MinioClient
import org.apache.commons.lang3.StringUtils
import org.apache.commons.vfs2.FileSystemOptions
import org.apache.commons.vfs2.VFS
import org.apache.commons.vfs2.provider.FileProvider
class S3ProtocolProvider private constructor() : TransferProtocolProvider {
@@ -26,11 +23,7 @@ class S3ProtocolProvider private constructor() : TransferProtocolProvider {
return Icons.minio
}
override fun getFileProvider(): FileProvider {
return S3FileProvider.instance
}
override fun getRootFileObject(requester: PathHandlerRequest): PathHandler {
override fun createPathHandler(requester: PathHandlerRequest): PathHandler {
val host = requester.host
val builder = MinioClient.builder()
.endpoint(host.host)
@@ -39,21 +32,12 @@ class S3ProtocolProvider private constructor() : TransferProtocolProvider {
if (StringUtils.isNotBlank(region)) {
builder.region(region)
}
val delimiter = host.options.extras["s3.delimiter"] ?: "/"
val options = FileSystemOptions()
// val delimiter = host.options.extras["s3.delimiter"] ?: "/"
val defaultPath = host.options.sftpDefaultDirectory
S3FileSystemConfigBuilder.instance.setRegion(options, StringUtils.defaultString(region))
S3FileSystemConfigBuilder.instance.setEndpoint(options, host.host)
S3FileSystemConfigBuilder.instance.setAccessKey(options, host.username)
S3FileSystemConfigBuilder.instance.setSecretKey(options, host.authentication.password)
S3FileSystemConfigBuilder.instance.setDelimiter(options, delimiter)
val file = VFS.getManager().resolveFile(
"s3://${StringUtils.defaultIfBlank(defaultPath, "/")}",
options
)
return PathHandler(file)
val minioClient = builder.build()
val fs = S3FileSystem(minioClient)
return PathHandler(fs, fs.getPath(defaultPath))
}
}

View File

@@ -9,6 +9,6 @@ class S3ProtocolProviderExtension private constructor() : ProtocolProviderExtens
}
override fun getProtocolProvider(): ProtocolProvider {
return S3ProtocolProvider.Companion.instance
return S3ProtocolProvider.instance
}
}

View File

@@ -0,0 +1,51 @@
package app.termora.plugins.s3
import io.minio.StatObjectResponse
import org.apache.commons.io.IOUtils
import java.nio.ByteBuffer
import java.nio.channels.ReadableByteChannel
import java.nio.channels.SeekableByteChannel
class S3ReadSeekableByteChannel(
private val channel: ReadableByteChannel,
private val stat: StatObjectResponse
) : SeekableByteChannel {
private var position: Long = 0
override fun read(dst: ByteBuffer): Int {
val bytesRead = channel.read(dst)
if (bytesRead > 0) {
position += bytesRead
}
return bytesRead
}
override fun write(src: ByteBuffer): Int {
throw UnsupportedOperationException("Read-only channel")
}
override fun position(): Long {
return position
}
override fun position(newPosition: Long): SeekableByteChannel {
throw UnsupportedOperationException("Seek not supported in streaming read")
}
override fun size(): Long {
return stat.size()
}
override fun truncate(size: Long): SeekableByteChannel {
throw UnsupportedOperationException("Read-only channel")
}
override fun isOpen(): Boolean {
return channel.isOpen
}
override fun close() {
IOUtils.closeQuietly(channel)
}
}

View File

@@ -0,0 +1,44 @@
package app.termora.plugins.s3
import org.apache.commons.io.IOUtils
import java.nio.ByteBuffer
import java.nio.channels.SeekableByteChannel
import java.nio.channels.WritableByteChannel
class S3WriteSeekableByteChannel(
private val channel: WritableByteChannel,
) : SeekableByteChannel {
override fun read(dst: ByteBuffer): Int {
throw UnsupportedOperationException("read not supported")
}
override fun write(src: ByteBuffer): Int {
return channel.write(src)
}
override fun position(): Long {
throw UnsupportedOperationException("position not supported")
}
override fun position(newPosition: Long): SeekableByteChannel {
throw UnsupportedOperationException("position not supported")
}
override fun size(): Long {
throw UnsupportedOperationException("size not supported")
}
override fun truncate(size: Long): SeekableByteChannel {
throw UnsupportedOperationException("truncate not supported")
}
override fun isOpen(): Boolean {
return channel.isOpen
}
override fun close() {
IOUtils.closeQuietly(channel)
}
}