Source code for dscleaner.splitter

from .fileinfo import FileInfo
from . import ifileinfo, filewriter, CsvFileInfo, utils
import numpy as np
import os
import soundfile as sf
from math import ceil

[docs]class Splitter(): """ Splitter allows splitting an existing file. Args: channels: Number of channels the files parsed should have. path: the path where the new merger file should be created. samplerate: samplerate to write on the file. max_length: Maximum file length in minutes. """ def __init__(self, channels, path, samplerate, max_length): #need to figure out if either max_length by time or by size self._channels = channels self._init_samples() if(isinstance(samplerate,int)): self._samplerate = samplerate else: raise TypeError("Samplerate not integer") #if(not(isinstance(max_length,int))): # raise TypeError("max_length not integer") self.path = path #max_length = minutes * seconds self.max_num_samples = ceil((max_length * 60) * samplerate) #if the file exists already, then there's no need to create it(FileInfo) #if theres no file, we need to create it (array) self.file_exists = (os.path.isfile(self.path)) def _init_samples(self): self._samples = np.empty(shape=(0,self._channels)) def __enter__(self): return self def __exit__(self,type,value,traceback): pass
[docs] def add(self, *files): """ Adds new samples to the buffer array. When ``create_file`` method is executed the buffer gets emptied. Args: *files: An array, containing several pathes to files or IFileInfos specializations, although the latter is preferred. """ tmp = None for f in files: if(issubclass(type(f),ifileinfo.IFileInfo)): tmp = f.getSamples() else: with FileInfo(f) as this: tmp = this.getSamples() self._samples = np.append(self._samples,tmp,axis=0)
[docs] def create_file(self, samplerate = None): """ Creates a new file with the filename, converts based on extension given in ``new_filename`` When executed the sample buffer will be emptied, so ``create_file`` should be executed frequently. Args: samplerate: Samplerate of the file. """ if(len(self._samples) == 0): raise TypeError("No samples are queued!") if(samplerate == None): samplerate = self._samplerate path = utils.path_splitter(self.path) if(path['file_name'] == None): raise TypeError("It doesn't contain an extension") #tries to create the directory try: import os os.mkdir(path['path']) except (OSError,FileNotFoundError) as e: pass samples_left = len(self._samples) files_index = 0 while(samples_left > 0): samples_to_write = self._samples[self.max_num_samples*files_index:self.max_num_samples*(files_index+1)] samples_left -= self.max_num_samples with CsvFileInfo(samples_to_write,self._samplerate) as infofile: with filewriter.FileWriter(infofile) as fw: fw.create_file(path['path']+path['file_name']+'_'+str(files_index)+'.'+path['extension'], self._samplerate) files_index+=1 self._init_samples()