|
- # -*- coding: utf8 -*-
- from pyramid.view import view_config, view_defaults
- from pyramid.response import Response
- from pyramid.exceptions import NotFound
- from pyramid.httpexceptions import HTTPNotFound, HTTPBadRequest
- from PIL import Image
- import re, os, shutil
- from os import path
- import mimetypes
- import magic
- import subprocess
- try:
- from StringIO import StringIO
- except ImportError:
- from io import StringIO
- # Database access imports
- from .models import User, Place, Tiers, Event, SallePhy
- from .blenderthumbnailer import blend_extract_thumb, write_png
- from jm2l.const import CurrentYear
- from slugify import slugify
-
- MIN_FILE_SIZE = 1 # bytes
- MAX_FILE_SIZE = 500000000 # bytes
- IMAGE_TYPES = re.compile('image/(gif|p?jpeg|(x-)?png)')
- ACCEPTED_MIMES = ['application/pdf',
- 'application/vnd.oasis.opendocument.text',
- 'application/vnd.oasis.opendocument.text-template',
- 'application/vnd.oasis.opendocument.graphics',
- 'application/vnd.oasis.opendocument.graphics-template',
- 'application/vnd.oasis.opendocument.presentation',
- 'application/vnd.oasis.opendocument.presentation-template',
- 'application/vnd.oasis.opendocument.spreadsheet',
- 'application/vnd.oasis.opendocument.spreadsheet-template',
- 'image/svg+xml',
- 'application/x-blender'
- ]
-
- ACCEPT_FILE_TYPES = IMAGE_TYPES
- THUMBNAIL_SIZE = 80
- EXPIRATION_TIME = 300 # seconds
- IMAGEPATH = ['images']
- DOCPATH = ['document']
- THUMBNAILPATH = ['images', 'thumbnails']
- # change the following to POST if DELETE isn't supported by the webserver
- DELETEMETHOD = "DELETE"
-
- mimetypes.init()
-
-
- class MediaPath():
-
- def get_all(self, media_table, linked_id, MediaType=None):
- filelist = list()
- curpath = self.get_mediapath(media_table, linked_id, None)
- thumbpath = os.path.join(curpath, 'thumbnails')
- if not os.path.isdir(curpath) or not os.path.isdir(thumbpath):
- return list()
- for f in os.listdir(curpath):
- filename, ext = os.path.splitext(f)
- if os.path.isdir(os.path.join(curpath, f)):
- continue
- if f.endswith('.type'):
- continue
- if f:
- ress_url = '/image/%s/%d/%s' % (media_table, linked_id, f.replace(" ", "%20"))
- thumb_url = '/image/%s/%d/thumbnails/%s' % (media_table, linked_id, f.replace(" ", "%20"))
- if MediaType is None:
- if os.path.exists(os.path.join(thumbpath, f + ".jpg")):
- filelist.append((ress_url, thumb_url + ".jpg"))
- else:
- filelist.append((ress_url, thumb_url))
- elif MediaType == 'Image' and len(os.path.splitext(filename)[1]) == 0:
- filelist.append((ress_url, thumb_url))
- elif MediaType == 'Other' and len(os.path.splitext(filename)[1]):
- filelist.append((ress_url, thumb_url))
- return filelist
-
- def get_list(self, media_table, linked_id, MediaType=None):
- filelist = list()
- curpath = self.get_mediapath(media_table, linked_id, None)
- if not os.path.isdir(curpath):
- return list()
- for f in os.listdir(curpath):
- if os.path.isdir(os.path.join(curpath, f)):
- continue
- if f.endswith('.type'):
- continue
- if f:
- filename, ext = os.path.splitext(f)
- tmpurl = '/image/%s/%d/%s' % (media_table, linked_id, f.replace(" ", "%20"))
- if MediaType is None:
- filelist.append(tmpurl)
- elif MediaType == 'Image' and ext.lower() in ['.gif', '.jpg', '.png', '.svg', '.jpeg']:
- filelist.append(tmpurl)
- elif MediaType == 'Other' and ext.lower() not in ['.gif', '.jpg', '.png', '.svg', '.jpeg']:
- filelist.append(tmpurl)
- return filelist
-
- def get_thumb(self, media_table, linked_id, MediaType=None):
- filelist = list()
- curpath = self.get_mediapath(media_table, linked_id, None)
- curpath = os.path.join(curpath, 'thumbnails')
- if not os.path.isdir(curpath):
- return list()
- for f in os.listdir(curpath):
- filename, ext = os.path.splitext(f)
- if os.path.isdir(os.path.join(curpath, f)):
- continue
- if f.endswith('.type'):
- continue
- if f:
- tmpurl = '/image/%s/%d/thumbnails/%s' % (media_table, linked_id, f.replace(" ", "%20"))
- if MediaType is None:
- filelist.append(tmpurl)
- elif MediaType == 'Image' and len(os.path.splitext(filename)[1]) == 0:
- filelist.append(tmpurl)
- elif MediaType == 'Other' and len(os.path.splitext(filename)[1]):
- filelist.append(tmpurl)
- return filelist
-
- def move_mediapath(self, media_table, from_id, to_id):
- """
- Move target media folder to follow database
- :param media_table: type of media
- :param from_id: source
- :param to_id: destination
- :return: Error if any
- """
- if media_table in ['tiers', 'place', 'salle', 'users']:
- src = IMAGEPATH + [media_table, from_id]
- dst = IMAGEPATH + [media_table, to_id]
- else:
- raise RuntimeError("Sorry, Media '%s' not supported yet for move." % media_table)
-
- src_path = os.path.join('jm2l/upload', *src)
- dst_path = os.path.join('jm2l/upload', *dst)
- if not os.path.isdir(src_path):
- # Nothing to do ...
- return False
- if os.path.isdir(dst_path):
- raise RuntimeError('Destination path already exist')
-
- shutil.move(src_path, dst_path)
-
- return True
-
- def get_mediapath(self, media_table, linked_id, name):
- """
- :param media_table: type of media
- :param linked_id: id of media
- :param name: filename
- :return: full relative path on server side
- """
- linked_id = str(linked_id)
- if media_table in ['tiers', 'place', 'salle']:
- # Retrieve Slug
- if media_table == 'tiers':
- slug = Tiers.by_id(linked_id).slug
- if media_table == 'place':
- slug = Place.by_id(linked_id).slug or slugify(Place.by_id(linked_id).name)
- if media_table == 'salle':
- slug = SallePhy.by_id(linked_id).slug
- p = IMAGEPATH + [media_table, slug]
- elif media_table == 'presse':
- # Use Year in linked_id
- p = IMAGEPATH + [media_table, linked_id]
- elif media_table == 'tasks':
- # Use Current Year
- p = IMAGEPATH + [str(CurrentYear), media_table, linked_id]
- elif media_table == 'poles':
- # Use Current Year
- p = IMAGEPATH + [str(CurrentYear), media_table, linked_id]
- elif media_table in ['RIB', 'Justif']:
- slug = User.by_id(linked_id).slug
- p = IMAGEPATH + ['users', slug, media_table]
- elif media_table in ['users', 'badge']:
- user = User.by_id(linked_id)
- if not user:
- raise HTTPNotFound()
- else:
- slug = user.slug
- p = IMAGEPATH + [media_table, slug]
- elif media_table == 'event':
- ev = Event.by_id(linked_id)
- slug = ev.slug
- year = ev.for_year
- p = IMAGEPATH + ['event', str(year), slug]
-
- if name:
- p += [name]
- TargetPath = os.path.join('jm2l/upload', *p)
- if not os.path.isdir(os.path.dirname(TargetPath)):
- try:
- os.makedirs(os.path.dirname(TargetPath))
- except OSError as e:
- if e.errno != 17:
- raise e
- return os.path.join('jm2l/upload', *p)
-
- def ExtMimeIcon(self, mime):
- if mime == 'application/pdf':
- return "/img/PDF.png"
-
- def check_blend_file(self, fileobj):
- head = fileobj.read(12)
- fileobj.seek(0)
-
- if head[:2] == b'\x1f\x8b': # gzip magic
- import zlib
- head = zlib.decompress(fileobj.read(), 31)[:12]
- fileobj.seek(0)
-
- if head.startswith(b'BLENDER'):
- return True
-
- def get_mimetype_from_file(self, fileobj):
- mimetype = magic.from_buffer(fileobj.read(1024), mime=True)
- fileobj.seek(0)
-
- # Check if the binary file is a blender file
- if (mimetype == "application/octet-stream" or mimetype == "application/x-gzip") and self.check_blend_file(
- fileobj):
- return "application/x-blender", True
- else:
- return mimetype, False
-
- def get_mimetype(self, name):
- """ This function return the mime-type based on .type file """
- try:
- with open(self.mediapath(name) + '.type', 'r', 16) as f:
- mime = f.read()
- return mime
- except IOError:
- return None
-
-
- @view_defaults(route_name='media_upload')
- class MediaUpload(MediaPath):
-
- def __init__(self, request):
- self.request = request
- self.media_table = self.request.matchdict.get('media_table')
- self.display_only = False
- if self.media_table.startswith('_'):
- self.display_only = True
- self.media_table = self.media_table[1:]
- self.linked_id = self.request.matchdict.get('uid')
- if not self.linked_id.isdigit():
- raise HTTPBadRequest('Wrong Parameter')
- request.response.headers['Access-Control-Allow-Origin'] = '*'
- request.response.headers['Access-Control-Allow-Methods'] = 'OPTIONS, HEAD, GET, POST, PUT, DELETE'
-
- def mediapath(self, name):
- return self.get_mediapath(self.media_table, self.linked_id, name)
-
- def validate(self, result, filecontent):
- """ Do some basic check to the uploaded file before to accept it """
- # Try to determine mime type from content uploaded
- found_mime = magic.from_buffer(filecontent.read(1024), mime=True)
- filecontent.seek(0)
-
- # Do a special statement for specific detected mime type
- if found_mime in ["application/octet-stream", "application/x-gzip"]:
- # Lets see if it's a bender file
- if self.check_blend_file(filecontent):
- found_mime = "application/x-blender"
- # MonKey Patch of content type
- result['type'] = found_mime
-
- # Reject mime type that don't match
- if found_mime != result['type']:
- result['error'] = 'L\'extension du fichier ne correspond pas à son contenu - '
- result['error'] += "( %s vs %s )" % (found_mime, result['type'])
- return False
-
- # Accept images and mime types listed
- if not found_mime in ACCEPTED_MIMES:
- if not (IMAGE_TYPES.match(found_mime)):
- result['error'] = 'Ce type fichier n\'est malheureusement pas supporté. '
- return False
-
- if result['size'] < MIN_FILE_SIZE:
- result['error'] = 'le fichier est trop petit'
- elif result['size'] > MAX_FILE_SIZE:
- result['error'] = 'le fichier est trop voluminueux'
- # elif not ACCEPT_FILE_TYPES.match(file['type']):
- # file['error'] = u'les type de fichiers acceptés sont png, jpg et gif'
- else:
- return True
-
- return False
-
- def get_file_size(self, fileobj):
- fileobj.seek(0, 2) # Seek to the end of the file
- size = fileobj.tell() # Get the position of EOF
- fileobj.seek(0) # Reset the file position to the beginning
- return size
-
- def thumbnailurl(self, name):
- return self.request.route_url('media_view', name='thumbnails',
- media_table=self.media_table,
- uid=self.linked_id) + '/' + name
-
- def thumbnailpath(self, name):
- origin = self.mediapath(name)
- TargetPath = os.path.join(os.path.dirname(origin), 'thumbnails', name)
- if not os.path.isdir(os.path.dirname(TargetPath)):
- os.makedirs(os.path.dirname(TargetPath))
- return TargetPath
-
- def createthumbnail(self, filename):
- image = Image.open(self.mediapath(filename))
- image.thumbnail((THUMBNAIL_SIZE, THUMBNAIL_SIZE), Image.ANTIALIAS)
- timage = Image.new('RGBA', (THUMBNAIL_SIZE, THUMBNAIL_SIZE), (255, 255, 255, 0))
- timage.paste(
- image,
- (int((THUMBNAIL_SIZE - image.size[0]) / 2), int((THUMBNAIL_SIZE - image.size[1]) / 2)))
- TargetFileName = self.thumbnailpath(filename)
- if not TargetFileName.endswith('png'):
- timage = timage.convert('RGB')
- timage.save(TargetFileName)
- return self.thumbnailurl(os.path.basename(TargetFileName))
-
- def pdfthumbnail(self, filename):
- TargetFileName = self.thumbnailpath(filename)
- Command = ["convert", "./%s[0]" % self.mediapath(filename), "./%s_.jpg" % TargetFileName]
- Result = subprocess.call(Command)
- if Result == 0:
- image = Image.open(TargetFileName + "_.jpg")
- pdf_indicator = Image.open("jm2l/static/img/PDF_Thumb_Stamp.png")
- image.thumbnail((THUMBNAIL_SIZE, THUMBNAIL_SIZE), Image.ANTIALIAS)
- timage = Image.new('RGBA', (THUMBNAIL_SIZE, THUMBNAIL_SIZE), (255, 255, 255, 0))
- # Add thumbnail
- timage.paste(
- image,
- (int((THUMBNAIL_SIZE - image.size[0]) / 2), int((THUMBNAIL_SIZE - image.size[1]) / 2)))
- # Stamp with PDF file type
- timage.paste(
- pdf_indicator,
- (timage.size[0] - 30, timage.size[1] - 30),
- pdf_indicator,
- )
- timage.convert('RGB').save(TargetFileName + ".jpg", 'JPEG')
- os.unlink(TargetFileName + "_.jpg")
- return self.thumbnailurl(os.path.basename(TargetFileName + ".jpg"))
- return self.ExtMimeIcon('application/pdf')
-
- def svgthumbnail(self, filename):
- TargetFileName = self.thumbnailpath(filename)
- Command = ["convert", "./%s[0]" % self.mediapath(filename), "./%s_.jpg" % TargetFileName]
- Result = subprocess.call(Command)
- if Result == 0:
- image = Image.open(TargetFileName + "_.jpg")
- pdf_indicator = Image.open("jm2l/static/img/svg-icon.png")
- image.thumbnail((THUMBNAIL_SIZE, THUMBNAIL_SIZE), Image.ANTIALIAS)
- timage = Image.new('RGBA', (THUMBNAIL_SIZE, THUMBNAIL_SIZE), (255, 255, 255, 0))
- # Add thumbnail
- timage.paste(
- image,
- (int((THUMBNAIL_SIZE - image.size[0]) / 2), int((THUMBNAIL_SIZE - image.size[1]) / 2)))
- # Stamp with PDF file type
- timage.paste(
- pdf_indicator,
- (timage.size[0] - 30, timage.size[1] - 30),
- pdf_indicator,
- )
- timage.convert('RGB').save(TargetFileName + ".jpg", 'JPEG')
- os.unlink(TargetFileName + "_.jpg")
- return self.thumbnailurl(os.path.basename(TargetFileName + ".jpg"))
- return self.ExtMimeIcon('image/svg+xml')
-
- def docthumbnail(self, filename):
- TargetFileName = self.thumbnailpath(filename)
- # let's take the thumbnail generated inside the document
- Command = ["unzip", "-p", self.mediapath(filename), "Thumbnails/thumbnail.png"]
- ThumbBytes = subprocess.check_output(Command)
- image = Image.open(StringIO.StringIO(ThumbBytes))
- image.thumbnail((THUMBNAIL_SIZE, THUMBNAIL_SIZE), Image.ANTIALIAS)
- # Use the correct stamp
- f, ext = os.path.splitext(filename)
- istamp = [('Writer', 'odt'),
- ('Impress', 'odp'),
- ('Calc', 'ods'),
- ('Draw', 'odg')]
- stampfilename = filter(lambda x, y: ext.endswith(y), istamp)
- stamp = Image.open("jm2l/static/img/%s-icon.png" % stampfilename[0][0])
-
- timage = Image.new('RGBA', (THUMBNAIL_SIZE, THUMBNAIL_SIZE), (255, 255, 255, 0))
- # Add thumbnail
- timage.paste(
- image,
- (int((THUMBNAIL_SIZE - image.size[0]) / 2), int((THUMBNAIL_SIZE - image.size[1]) / 2)))
- # Stamp with PDF file type
- timage.paste(
- stamp,
- (timage.size[0] - 30, timage.size[1] - 30),
- stamp,
- )
- timage.convert('RGB').save(TargetFileName + ".jpg", 'JPEG')
- return self.thumbnailurl(os.path.basename(TargetFileName + ".jpg"))
-
- def blendthumbnail(self, filename):
- blendfile = self.mediapath(filename)
- # Extract Thumb
- if 0:
- head = fileobj.read(12)
- fileobj.seek(0)
-
- if head[:2] == b'\x1f\x8b': # gzip magic
- import zlib
- head = zlib.decompress(fileobj.read(), 31)[:12]
- fileobj.seek(0)
-
- buf, width, height = blend_extract_thumb(blendfile)
- if buf:
- png = write_png(buf, width, height)
- TargetFileName = self.thumbnailpath(filename)
- image = Image.open(StringIO.StringIO(png))
- blender_indicator = Image.open("jm2l/static/img/Blender_Thumb_Stamp.png")
- image.thumbnail((THUMBNAIL_SIZE, THUMBNAIL_SIZE), Image.ANTIALIAS)
- timage = Image.new('RGBA', (THUMBNAIL_SIZE, THUMBNAIL_SIZE), (255, 255, 255, 0))
- # Add thumbnail
- timage.paste(
- image,
- (int((THUMBNAIL_SIZE - image.size[0]) / 2), int((THUMBNAIL_SIZE - image.size[1]) / 2)))
- # Stamp with Blender file type
- timage.paste(
- blender_indicator,
- (timage.size[0] - 30, timage.size[1] - 30),
- blender_indicator,
- )
- timage.save(TargetFileName + ".png")
- return self.thumbnailurl(os.path.basename(TargetFileName + ".png"))
- return self.ExtMimeIcon('application/x-blender')
-
- def fileinfo(self, name):
- filename = self.mediapath(name)
- f, ext = os.path.splitext(name)
- if ext != '.type' and os.path.isfile(filename):
- info = {}
- info['name'] = name
- info['size'] = os.path.getsize(filename)
- info['url'] = self.request.route_url('media_view',
- name=name,
- media_table=self.media_table,
- uid=self.linked_id)
-
- mime = self.get_mimetype(name)
- if IMAGE_TYPES.match(mime):
- info['thumbnailUrl'] = self.thumbnailurl(name)
- elif mime in ACCEPTED_MIMES:
- thumb = self.thumbnailpath("%s%s" % (f, ext))
- thumbext = ".jpg"
- if mime == "application/x-blender":
- thumbext = ".png"
- if os.path.exists(thumb + thumbext):
- info['thumbnailUrl'] = self.thumbnailurl(name) + thumbext
- else:
- info['thumbnailUrl'] = self.ExtMimeIcon(mime)
- else:
- info['thumbnailUrl'] = self.ExtMimeIcon(mime)
- if not self.display_only:
- info['deleteType'] = DELETEMETHOD
- info['deleteUrl'] = self.request.route_url('media_upload',
- sep='',
- name='',
- media_table=self.media_table,
- uid=self.linked_id) + '/' + name
- if DELETEMETHOD != 'DELETE':
- info['deleteUrl'] += '&_method=DELETE'
- return info
- else:
- return None
-
- @view_config(request_method='OPTIONS')
- def options(self):
- return Response(body='')
-
- @view_config(request_method='HEAD')
- def options(self):
- return Response(body='')
-
- @view_config(request_method='GET', renderer="json")
- def get(self):
- p = self.request.matchdict.get('name')
- if p:
- return self.fileinfo(p)
- else:
- filelist = []
- content = self.mediapath('')
- if content and path.exists(content):
- for f in os.listdir(content):
- n = self.fileinfo(f)
- if n:
- filelist.append(n)
- return {"files": filelist}
-
- @view_config(request_method='DELETE', xhr=True, accept="application/json", renderer='json')
- def delete(self):
- filename = self.request.matchdict.get('name')
- try:
- os.remove(self.mediapath(filename) + '.type')
- except IOError:
- pass
- except OSError:
- pass
- try:
- os.remove(self.thumbnailpath(filename))
- except IOError:
- pass
- except OSError:
- pass
- try:
- os.remove(self.thumbnailpath(filename + ".jpg"))
- except IOError:
- pass
- except OSError:
- pass
- try:
- os.remove(self.mediapath(filename))
- except IOError:
- return False
- return True
-
- @view_config(request_method='POST', xhr=True, accept="application/json", renderer='json')
- def post(self):
- if self.request.matchdict.get('_method') == "DELETE":
- return self.delete()
- results = []
- for name, fieldStorage in self.request.POST.items():
- # if isinstance(fieldStorage, unicode):
- # continue
- result = {}
- result['name'] = os.path.basename(fieldStorage.filename)
- result['type'] = fieldStorage.type
- result['size'] = self.get_file_size(fieldStorage.file)
-
- if self.validate(result, fieldStorage.file):
- filename, file_extension = os.path.splitext(result['name'])
- local_filename = slugify(filename) + file_extension
- # Keep mime-type in .type file
- with open(self.mediapath(local_filename) + '.type', 'w') as f:
- f.write(result['type'])
-
- # Store uploaded file
- fieldStorage.file.seek(0)
- with open(self.mediapath(local_filename), 'wb') as f:
- shutil.copyfileobj(fieldStorage.file, f)
-
- if re.match(IMAGE_TYPES, result['type']):
- result['thumbnailUrl'] = self.createthumbnail(local_filename)
- elif result['type'] == 'application/pdf':
- result['thumbnailUrl'] = self.pdfthumbnail(local_filename)
- elif result['type'] == 'image/svg+xml':
- result['thumbnailUrl'] = self.svgthumbnail(local_filename)
- elif result['type'].startswith('application/vnd'):
- result['thumbnailUrl'] = self.docthumbnail(local_filename)
- elif result['type'] == 'application/x-blender':
- result['thumbnailUrl'] = self.blendthumbnail(local_filename)
- else:
- result['thumbnailUrl'] = self.ExtMimeIcon(result['type'])
-
- result['deleteType'] = DELETEMETHOD
- result['deleteUrl'] = self.request.route_url('media_upload',
- sep='',
- name='',
- media_table=self.media_table,
- uid=self.linked_id) + '/' + local_filename
- result['url'] = self.request.route_url('media_view',
- media_table=self.media_table,
- uid=self.linked_id,
- name=local_filename)
- if DELETEMETHOD != 'DELETE':
- result['deleteUrl'] += '&_method=DELETE'
- results.append(result)
- return {"files": results}
-
-
- @view_defaults(route_name='media_view')
- class MediaView(MediaPath):
-
- def __init__(self, request):
- self.request = request
- self.media_table = self.request.matchdict.get('media_table')
- self.linked_id = self.request.matchdict.get('uid')
-
- def mediapath(self, name):
- return self.get_mediapath(self.media_table, self.linked_id, name)
-
- @view_config(request_method='GET', http_cache=(EXPIRATION_TIME, {'public': True}))
- def get(self):
- name = self.request.matchdict.get('name')
- self.request.response.content_type = self.get_mimetype(name)
-
- try:
- self.request.response.body_file = open(self.mediapath(name), 'rb', 10000)
- except IOError:
- raise NotFound
- return self.request.response
- ##return Response(app_iter=ImgHandle, content_type = 'image/png')
|