Fusepyのインストール

提供:TuntunkunMediaWiki

移動: 案内, 検索

目次

インストール

pip install fusepy

サンプルコード

以下はプログラムの実行を行った際に引数1で与えられたディレクトリから、引数2で与えられたディレクトリ操作を転送するというサンプルコードです。 https://github.com/terencehonles/fusepy/blob/master/examples/loopback.py

#!/usr/bin/env python
 
from __future__ import with_statement
 
from errno import EACCES
from os.path import realpath
from sys import argv, exit
from threading import Lock
 
import os
 
from fuse import FUSE, FuseOSError, Operations, LoggingMixIn
 
 
class Loopback(LoggingMixIn, Operations):
    def __init__(self, root):
        self.root = realpath(root)
        self.rwlock = Lock()
 
    def __call__(self, op, path, *args):
        return super(Loopback, self).__call__(op, self.root + path, *args)
 
    def access(self, path, mode):
        if not os.access(path, mode):
            raise FuseOSError(EACCES)
 
    chmod = os.chmod
    chown = os.chown
 
    def create(self, path, mode):
        return os.open(path, os.O_WRONLY | os.O_CREAT, mode)
 
    def flush(self, path, fh):
        return os.fsync(fh)
 
    def fsync(self, path, datasync, fh):
        return os.fsync(fh)
 
    def getattr(self, path, fh=None):
        st = os.lstat(path)
        return dict((key, getattr(st, key)) for key in ('st_atime', 'st_ctime',
            'st_gid', 'st_mode', 'st_mtime', 'st_nlink', 'st_size', 'st_uid'))
 
    getxattr = None
 
    def link(self, target, source):
        return os.link(source, target)
 
    listxattr = None
    mkdir = os.mkdir
    mknod = os.mknod
    open = os.open
 
    def read(self, path, size, offset, fh):
        with self.rwlock:
            os.lseek(fh, offset, 0)
            return os.read(fh, size)
 
    def readdir(self, path, fh):
        return ['.', '..'] + os.listdir(path)
 
    readlink = os.readlink
 
    def release(self, path, fh):
        return os.close(fh)
 
    def rename(self, old, new):
        return os.rename(old, self.root + new)
 
    rmdir = os.rmdir
 
    def statfs(self, path):
        stv = os.statvfs(path)
        return dict((key, getattr(stv, key)) for key in ('f_bavail', 'f_bfree',
            'f_blocks', 'f_bsize', 'f_favail', 'f_ffree', 'f_files', 'f_flag',
            'f_frsize', 'f_namemax'))
 
    def symlink(self, target, source):
        return os.symlink(source, target)
 
    def truncate(self, path, length, fh=None):
        with open(path, 'r+') as f:
            f.truncate(length)
 
    unlink = os.unlink
    utimens = os.utime
 
    def write(self, path, data, offset, fh):
        with self.rwlock:
            os.lseek(fh, offset, 0)
            return os.write(fh, data)
 
 
if __name__ == '__main__':
    if len(argv) != 3:
        print('usage: %s <root> <mountpoint>' % argv[0])
        exit(1)
 
    fuse = FUSE(Loopback(argv[1]), argv[2], foreground=True)

Dummy Fuse Interface

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os.path
import errno
from fuse import FUSE, FuseOSError, Operations
 
class Grifone(Operations):
	def __init__(self):
		pass
 
	# +============================
	# | Filesystem method
	# +============================
	def access(self, path, mode):
		raise FuseOSError(errno.EACCES)
 
	def chmod(self, path, mode):
		raise FuseOSError(errno.EACCES)
 
	def chown(self, path, mode):
		raise FuseOSError(errno.EACCES)
 
	def getattr(self, path, fh=None):
		raise FuseOSError(errno.EACCES)
 
	def getxattr(self, path, fh=None):
		raise FuseOSError(errno.EACCES)
 
	def readdir(self, path, fh):
		raise FuseOSError(errno.EACCES)
		#return ['.', '..'] + os.listdir(path)
 
	def readlink(self, path):
		raise FuseOSError(errno.EACCES)
 
	def mknod(self, path, mode, dev):
		raise FuseOSError(errno.EACCES)
 
	def mkdir(self, path, mode):
		raise FuseOSError(errno.EACCES)
 
	def rmdir(self, path):
		raise FuseOSError(errno.EACCES)
 
	def statfs(self, path):
		raise FuseOSError(errno.EACCES)
 
	def unlink(self, path):
		raise FuseOSError(errno.EACCES)
 
	def symlink(self, target, source):
		raise FuseOSError(errno.EACCES)
 
	def rename(self, old, new):
		raise FuseOSError(errno.EACCES)
 
	def link(self, target, source):
		raise FuseOSError(errno.EACCES)
 
	def utimens(self, path, times=None):
		raise FuseOSError(errno.EACCES)
 
	# +============================
	# | File method
	# +============================
	def open(self, path, flags):
		raise FuseOSError(errno.EACCES)
 
	def create(self, path, mode):
		raise FuseOSError(errno.EACCES)
 
	def read(self, path, size, offset, fh):
		raise FuseOSError(errno.EACCES)
 
	def write(self, path, data, offset, fh):
		raise FuseOSError(errno.EACCES)
 
	def truncate(self, path, length, fh=None):
		raise FuseOSError(errno.EACCES)
 
	def flush(self, path, fh):
		raise FuseOSError(errno.EACCES)
 
	def release(self, path, fh):
		raise FuseOSError(errno.EACCES)
 
	def fsync(self, path, datasync, fh):
		raise FuseOSError(errno.EACCES)

既にファイルが存在するディレクトリのマウントを行う

既にファイルが存在するディレクトリに対してマウントを行うと、以下のような例外が投げられます。

fuse: mountpoint is not empty
fuse: if you are sure this is safe, use the 'nonempty' mount option

noemptyオプションを設定する事で、上記例外を止める事が出来ます。以下に、nonemptyを使用する例を示します。

#FUSE(Operationオブジェクト, パス, オプション)
FUSE(Operationオブジェクト, パス, noempty=True)

他の権限からアクセスを行う

#FUSE(Operationオブジェクト, パス, オプション)
FUSE(Operationオブジェクト, パス, allow_other=True)
個人用ツール
名前空間
変種
操作
案内
ツールボックス