Интеграция Spring - AbstractInboundFileSynchronizer не обновляет файл
Я ожидал бы, что механизм синхронизации ftp обновит измененный файл. Однако из того, что я вижу здесь, файл загружается только в том случае, если он еще не существует. На данный момент файл не сохраняется локально, даже если временная метка / содержимое изменились.
Итак, вот что я обнаружил до сих пор:
Класс org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer
@Override
public void synchronizeToLocalDirectory(final File localDirectory) {
final String remoteDirectory = this.remoteDirectoryExpression.getValue(this.evaluationContext, String.class);
try {
int transferred = this.remoteFileTemplate.execute(new SessionCallback<F, Integer>() {
@Override
public Integer doInSession(Session<F> session) throws IOException {
F[] files = session.list(remoteDirectory);
if (!ObjectUtils.isEmpty(files)) {
List<F> filteredFiles = filterFiles(files);
for (F file : filteredFiles) {
try {
if (file != null) {
copyFileToLocalDirectory(
remoteDirectory, file, localDirectory,
session);
}
}
catch (RuntimeException e) {
if (AbstractInboundFileSynchronizer.this.filter instanceof ReversibleFileListFilter) {
((ReversibleFileListFilter<F>) AbstractInboundFileSynchronizer.this.filter)
.rollback(file, filteredFiles);
}
throw e;
}
catch (IOException e) {
if (AbstractInboundFileSynchronizer.this.filter instanceof ReversibleFileListFilter) {
((ReversibleFileListFilter<F>) AbstractInboundFileSynchronizer.this.filter)
.rollback(file, filteredFiles);
}
throw e;
}
}
return filteredFiles.size();
}
else {
return 0;
}
}
});
if (this.logger.isDebugEnabled()) {
this.logger.debug(transferred + " files transferred");
}
}
catch (Exception e) {
throw new MessagingException("Problem occurred while synchronizing remote to local directory", e);
}
}
Фильтрует файлы для загрузки. Я хотел бы использовать org.springframework.integration.ftp.filters.FtpPersistentAcceptOnceFileListFilter, который сравнивает имена файлов и даты последних изменений.
Затем он вызывает функцию copyFileToLocalDirectory с отфильтрованными файлами (для копирования).
protected void copyFileToLocalDirectory(String remoteDirectoryPath, F remoteFile, File localDirectory,
Session<F> session) throws IOException {
String remoteFileName = this.getFilename(remoteFile);
String localFileName = this.generateLocalFileName(remoteFileName);
String remoteFilePath = remoteDirectoryPath != null
? (remoteDirectoryPath + this.remoteFileSeparator + remoteFileName)
: remoteFileName;
if (!this.isFile(remoteFile)) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("cannot copy, not a file: " + remoteFilePath);
}
return;
}
File localFile = new File(localDirectory, localFileName);
if (!localFile.exists()) {
String tempFileName = localFile.getAbsolutePath() + this.temporaryFileSuffix;
File tempFile = new File(tempFileName);
OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile));
try {
session.read(remoteFilePath, outputStream);
}
catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
else {
throw new MessagingException("Failure occurred while copying from remote to local directory", e);
}
}
finally {
try {
outputStream.close();
}
catch (Exception ignored2) {
}
}
if (tempFile.renameTo(localFile)) {
if (this.deleteRemoteFiles) {
session.remove(remoteFilePath);
if (this.logger.isDebugEnabled()) {
this.logger.debug("deleted " + remoteFilePath);
}
}
}
if (this.preserveTimestamp) {
localFile.setLastModified(getModified(remoteFile));
}
}
}
Однако этот метод проверяет (только на основе имени файла), присутствует ли файл на локальном диске, и загружает его, только если его нет. Таким образом, в принципе нет шансов, что обновленный файл (с новой отметкой времени) будет загружен.
Я пытался изменить FtpInboundFileSynchronizer, но это слишком сложно. Каков наилучший способ "настроить" методы synchronize- / copyToLocalDirectory?
3 ответа
Можно обновить AbstractInboundFileSynchronizer
распознавать обновленные файлы, но это хрупко, и вы сталкиваетесь с другими проблемами.
Обновление 13 ноября 2016: узнал, как получить метки времени модификации в секундах.
Основная проблема с обновлением AbstractInboundFileSynchronizer
в том, что у него есть методы установки, но нет (защищенных) методов получения. Если в будущем методы установки сделают что-то умное, обновленная версия, представленная здесь, сломается.
Основная проблема с обновлением файлов в локальном каталоге - это параллелизм: если вы обрабатываете локальный файл одновременно с получением обновления, вы можете столкнуться с различными проблемами. Самый простой выход - переместить локальный файл во (временный) каталог обработки, чтобы получить обновление как новый файл, что, в свою очередь, устраняет необходимость обновления AbstractInboundFileSynchronizer
, См. Также отметку времени верблюда.
По умолчанию FTP-серверы предоставляют метки времени изменения в минутах. Для тестирования я обновил FTP-клиент для использования команды MLSD, которая предоставляет метки времени изменения в секундах (и миллисекундах, если вам повезет), но не все FTP-серверы поддерживают это.
Как упоминалось в справочнике Spring FTP, локальный файловый фильтр должен быть FileSystemPersistentAcceptOnceFileListFilter
чтобы обеспечить выбор локальных файлов при изменении метки времени изменения.
Ниже моя версия обновлена AbstractInboundFileSynchronizer
, а затем некоторые тестовые классы, которые я использовал.
public class FtpUpdatingFileSynchronizer extends FtpInboundFileSynchronizer {
protected final Log logger = LogFactory.getLog(this.getClass());
private volatile Expression localFilenameGeneratorExpression;
private volatile EvaluationContext evaluationContext;
private volatile boolean deleteRemoteFiles;
private volatile String remoteFileSeparator = "/";
private volatile boolean preserveTimestamp;
public FtpUpdatingFileSynchronizer(SessionFactory<FTPFile> sessionFactory) {
super(sessionFactory);
setPreserveTimestamp(true);
}
@Override
public void setLocalFilenameGeneratorExpression(Expression localFilenameGeneratorExpression) {
super.setLocalFilenameGeneratorExpression(localFilenameGeneratorExpression);
this.localFilenameGeneratorExpression = localFilenameGeneratorExpression;
}
@Override
public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) {
super.setIntegrationEvaluationContext(evaluationContext);
this.evaluationContext = evaluationContext;
}
@Override
public void setDeleteRemoteFiles(boolean deleteRemoteFiles) {
super.setDeleteRemoteFiles(deleteRemoteFiles);
this.deleteRemoteFiles = deleteRemoteFiles;
}
@Override
public void setRemoteFileSeparator(String remoteFileSeparator) {
super.setRemoteFileSeparator(remoteFileSeparator);
this.remoteFileSeparator = remoteFileSeparator;
}
@Override
public void setPreserveTimestamp(boolean preserveTimestamp) {
// updated
Assert.isTrue(preserveTimestamp, "for updating timestamps must be preserved");
super.setPreserveTimestamp(preserveTimestamp);
this.preserveTimestamp = preserveTimestamp;
}
@Override
protected void copyFileToLocalDirectory(String remoteDirectoryPath, FTPFile remoteFile, File localDirectory,
Session<FTPFile> session) throws IOException {
String remoteFileName = this.getFilename(remoteFile);
String localFileName = this.generateLocalFileName(remoteFileName);
String remoteFilePath = (remoteDirectoryPath != null
? (remoteDirectoryPath + this.remoteFileSeparator + remoteFileName)
: remoteFileName);
if (!this.isFile(remoteFile)) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("cannot copy, not a file: " + remoteFilePath);
}
return;
}
// start update
File localFile = new File(localDirectory, localFileName);
boolean update = false;
if (localFile.exists()) {
if (this.getModified(remoteFile) > localFile.lastModified()) {
this.logger.info("Updating local file " + localFile);
update = true;
} else {
this.logger.info("File already exists: " + localFile);
return;
}
}
// end update
String tempFileName = localFile.getAbsolutePath() + this.getTemporaryFileSuffix();
File tempFile = new File(tempFileName);
OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile));
try {
session.read(remoteFilePath, outputStream);
} catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
else {
throw new MessagingException("Failure occurred while copying from remote to local directory", e);
}
} finally {
try {
outputStream.close();
}
catch (Exception ignored2) {
}
}
// updated
if (update && !localFile.delete()) {
throw new MessagingException("Unable to delete local file [" + localFile + "] for update.");
}
if (tempFile.renameTo(localFile)) {
if (this.deleteRemoteFiles) {
session.remove(remoteFilePath);
if (this.logger.isDebugEnabled()) {
this.logger.debug("deleted " + remoteFilePath);
}
}
// updated
this.logger.info("Stored file locally: " + localFile);
} else {
// updated
throw new MessagingException("Unable to rename temporary file [" + tempFile + "] to [" + localFile + "]");
}
if (this.preserveTimestamp) {
localFile.setLastModified(getModified(remoteFile));
}
}
private String generateLocalFileName(String remoteFileName) {
if (this.localFilenameGeneratorExpression != null) {
return this.localFilenameGeneratorExpression.getValue(this.evaluationContext, remoteFileName, String.class);
}
return remoteFileName;
}
}
После некоторых тестовых занятий, которые я использовал. Я использовал зависимости org.springframework.integration:spring-integration-ftp:4.3.5.RELEASE
а также org.apache.ftpserver:ftpserver-core:1.0.6
(плюс обычные зависимости журналирования и тестирования).
public class TestFtpSync {
static final Logger log = LoggerFactory.getLogger(TestFtpSync.class);
static final String FTP_ROOT_DIR = "target" + File.separator + "ftproot";
// org.apache.ftpserver:ftpserver-core:1.0.6
static FtpServer server;
@BeforeClass
public static void startServer() throws FtpException {
File ftpRoot = new File (FTP_ROOT_DIR);
ftpRoot.mkdirs();
TestUserManager userManager = new TestUserManager(ftpRoot.getAbsolutePath());
FtpServerFactory serverFactory = new FtpServerFactory();
serverFactory.setUserManager(userManager);
ListenerFactory factory = new ListenerFactory();
factory.setPort(4444);
serverFactory.addListener("default", factory.createListener());
server = serverFactory.createServer();
server.start();
}
@AfterClass
public static void stopServer() {
if (server != null) {
server.stop();
}
}
File ftpFile = Paths.get(FTP_ROOT_DIR, "test1.txt").toFile();
File ftpFile2 = Paths.get(FTP_ROOT_DIR, "test2.txt").toFile();
@Test
public void syncDir() {
// org.springframework.integration:spring-integration-ftp:4.3.5.RELEASE
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
try {
ctx.register(FtpSyncConf.class);
ctx.refresh();
PollableChannel msgChannel = ctx.getBean("inputChannel", PollableChannel.class);
for (int j = 0; j < 2; j++) {
for (int i = 0; i < 2; i++) {
storeFtpFile();
}
for (int i = 0; i < 4; i++) {
fetchMessage(msgChannel);
}
}
} catch (Exception e) {
throw new AssertionError("FTP test failed.", e);
} finally {
ctx.close();
cleanup();
}
}
boolean tswitch = true;
void storeFtpFile() throws IOException, InterruptedException {
File f = (tswitch ? ftpFile : ftpFile2);
tswitch = !tswitch;
log.info("Writing message " + f.getName());
Files.write(f.toPath(), ("Hello " + System.currentTimeMillis()).getBytes());
}
Message<?> fetchMessage(PollableChannel msgChannel) {
log.info("Fetching message.");
Message<?> msg = msgChannel.receive(1000L);
if (msg == null) {
log.info("No message.");
} else {
log.info("Have a message: " + msg);
}
return msg;
}
void cleanup() {
delFile(ftpFile);
delFile(ftpFile2);
File d = new File(FtpSyncConf.LOCAL_DIR);
if (d.isDirectory()) {
for (File f : d.listFiles()) {
delFile(f);
}
}
log.info("Finished cleanup");
}
void delFile(File f) {
if (f.isFile()) {
if (f.delete()) {
log.info("Deleted " + f);
} else {
log.error("Cannot delete file " + f);
}
}
}
}
public class MlistFtpSessionFactory extends AbstractFtpSessionFactory<MlistFtpClient> {
@Override
protected MlistFtpClient createClientInstance() {
return new MlistFtpClient();
}
}
public class MlistFtpClient extends FTPClient {
@Override
public FTPFile[] listFiles(String pathname) throws IOException {
return super.mlistDir(pathname);
}
}
@EnableIntegration
@Configuration
public class FtpSyncConf {
private static final Logger log = LoggerFactory.getLogger(FtpSyncConf.class);
public static final String LOCAL_DIR = "/tmp/received";
@Bean(name = "ftpMetaData")
public ConcurrentMetadataStore ftpMetaData() {
return new SimpleMetadataStore();
}
@Bean(name = "localMetaData")
public ConcurrentMetadataStore localMetaData() {
return new SimpleMetadataStore();
}
@Bean(name = "ftpFileSyncer")
public FtpUpdatingFileSynchronizer ftpFileSyncer(
@Qualifier("ftpMetaData") ConcurrentMetadataStore metadataStore) {
MlistFtpSessionFactory ftpSessionFactory = new MlistFtpSessionFactory();
ftpSessionFactory.setHost("localhost");
ftpSessionFactory.setPort(4444);
ftpSessionFactory.setUsername("demo");
ftpSessionFactory.setPassword("demo");
FtpPersistentAcceptOnceFileListFilter fileFilter = new FtpPersistentAcceptOnceFileListFilter(metadataStore, "ftp");
fileFilter.setFlushOnUpdate(true);
FtpUpdatingFileSynchronizer ftpFileSync = new FtpUpdatingFileSynchronizer(ftpSessionFactory);
ftpFileSync.setFilter(fileFilter);
// ftpFileSync.setDeleteRemoteFiles(true);
return ftpFileSync;
}
@Bean(name = "syncFtp")
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(fixedDelay = "500", maxMessagesPerPoll = "1"))
public MessageSource<File> syncChannel(
@Qualifier("localMetaData") ConcurrentMetadataStore metadataStore,
@Qualifier("ftpFileSyncer") FtpUpdatingFileSynchronizer ftpFileSync) throws Exception {
FtpInboundFileSynchronizingMessageSource messageSource = new FtpInboundFileSynchronizingMessageSource(ftpFileSync);
File receiveDir = new File(LOCAL_DIR);
receiveDir.mkdirs();
messageSource.setLocalDirectory(receiveDir);
messageSource.setLocalFilter(new FileSystemPersistentAcceptOnceFileListFilter(metadataStore, "local"));
log.info("Message source bean created.");
return messageSource;
}
@Bean(name = "inputChannel")
public PollableChannel inputChannel() {
QueueChannel channel = new QueueChannel();
log.info("Message channel bean created.");
return channel;
}
}
/**
* Copied from https://github.com/spring-projects/spring-integration-samples/tree/master/basic/ftp/src/test/java/org/springframework/integration/samples/ftp/support
* @author Gunnar Hillert
*
*/
public class TestUserManager extends AbstractUserManager {
private BaseUser testUser;
private BaseUser anonUser;
private static final String TEST_USERNAME = "demo";
private static final String TEST_PASSWORD = "demo";
public TestUserManager(String homeDirectory) {
super("admin", new ClearTextPasswordEncryptor());
testUser = new BaseUser();
testUser.setAuthorities(Arrays.asList(new Authority[] {new ConcurrentLoginPermission(1, 1), new WritePermission()}));
testUser.setEnabled(true);
testUser.setHomeDirectory(homeDirectory);
testUser.setMaxIdleTime(10000);
testUser.setName(TEST_USERNAME);
testUser.setPassword(TEST_PASSWORD);
anonUser = new BaseUser(testUser);
anonUser.setName("anonymous");
}
public User getUserByName(String username) throws FtpException {
if(TEST_USERNAME.equals(username)) {
return testUser;
} else if(anonUser.getName().equals(username)) {
return anonUser;
}
return null;
}
public String[] getAllUserNames() throws FtpException {
return new String[] {TEST_USERNAME, anonUser.getName()};
}
public void delete(String username) throws FtpException {
throw new UnsupportedOperationException("Deleting of FTP Users is not supported.");
}
public void save(User user) throws FtpException {
throw new UnsupportedOperationException("Saving of FTP Users is not supported.");
}
public boolean doesExist(String username) throws FtpException {
return (TEST_USERNAME.equals(username) || anonUser.getName().equals(username)) ? true : false;
}
public User authenticate(Authentication authentication) throws AuthenticationFailedException {
if(UsernamePasswordAuthentication.class.isAssignableFrom(authentication.getClass())) {
UsernamePasswordAuthentication upAuth = (UsernamePasswordAuthentication) authentication;
if(TEST_USERNAME.equals(upAuth.getUsername()) && TEST_PASSWORD.equals(upAuth.getPassword())) {
return testUser;
}
if(anonUser.getName().equals(upAuth.getUsername())) {
return anonUser;
}
} else if(AnonymousAuthentication.class.isAssignableFrom(authentication.getClass())) {
return anonUser;
}
return null;
}
}
Обновление 15 ноября 2016: Примечание по xml-конфигурации.
Элемент xml inbound-channel-adapter
напрямую связан с FtpInboundFileSynchronizer
с помощью org.springframework.integration.ftp.config.FtpInboundChannelAdapterParser
с помощью FtpNamespaceHandler
с помощью spring-integration-ftp-4.3.5.RELEASE.jar!/META-INF/spring.handlers
,
Следуя справочному руководству xml-custom, указав FtpNamespaceHandler
в местном META-INF/spring.handlers
файл должен позволить вам использовать FtpUpdatingFileSynchronizer
вместо FtpInboundFileSynchronizer
, Это не работает для меня с юнит-тестами, и правильное решение, вероятно, будет включать создание дополнительных / модифицированных xsd-файлов, чтобы обычные inbound-channel-adapter
использует обычный FtpInboundFileSynchronizer
и специальный inbound-updating-channel-adapter
использует FtpUpdatingFileSynchronizer
, Делать это правильно - это немного выходит за рамки этого ответа.
Хотя быстрый взлом может помочь вам начать. Вы можете перезаписать по умолчанию FtpNamespaceHandler
создавая пакет org.springframework.integration.ftp.config
и класс FtpNamespaceHandler
в вашем местном проекте. Содержание показано ниже:
package org.springframework.integration.ftp.config;
public class FtpNamespaceHandler extends org.springframework.integration.config.xml.AbstractIntegrationNamespaceHandler {
@Override
public void init() {
System.out.println("Initializing FTP updating file synchronizer.");
// one updated line below, rest copied from original FtpNamespaceHandler
registerBeanDefinitionParser("inbound-channel-adapter", new MyFtpInboundChannelAdapterParser());
registerBeanDefinitionParser("inbound-streaming-channel-adapter",
new FtpStreamingInboundChannelAdapterParser());
registerBeanDefinitionParser("outbound-channel-adapter", new FtpOutboundChannelAdapterParser());
registerBeanDefinitionParser("outbound-gateway", new FtpOutboundGatewayParser());
}
}
package org.springframework.integration.ftp.config;
import org.springframework.integration.file.remote.synchronizer.InboundFileSynchronizer;
import org.springframework.integration.ftp.config.FtpInboundChannelAdapterParser;
public class MyFtpInboundChannelAdapterParser extends FtpInboundChannelAdapterParser {
@Override
protected Class<? extends InboundFileSynchronizer> getInboundFileSynchronizerClass() {
System.out.println("Returning updating file synchronizer.");
return FtpUpdatingFileSynchronizer.class;
}
}
Также добавьте preserve-timestamp="true"
в XML-файл, чтобы предотвратить новый IllegalArgumentException: for updating timestamps must be preserved
,
Прямой путь заключается в расширении AbstractInboundFileSynchronizer
и переопределить copyFileToLocalDirectory
метод для сравнения с условием, которое вы хотите, например, время модификации, если вы не хотите этого делать, то у вас есть другой способ, используя AOP:
@Before("org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.copyFileToLocalDirectory()")
затем получить аргумент от joinPoint
и сделайте сравнение, если это правда, и вы хотите обновить файл, поэтому удалите локальный файл.
Вы можете настроить FtpInboundFileSynchonizer с удаленным фильтром во время построения.
Установите фильтр на FtpPersistentAcceptOnceFileListFilter, чтобы настроить фильтрацию на основе изменений метки времени.
Удаленный фильтр, если он установлен, будет использован в synchronizeToLocalDirectory для фильтрации удаленного просмотра и загрузки файлов.
Это будет использовано в этой строке в doInSession
List<F> filteredFiles = filterFiles(files);
Вам также нужно будет установить для локального фильтра значение FileSystemPersistentAcceptOnceFileListFilter, чтобы обрабатывать локальные изменения сторон на основе изменений удаленного файла.
Кроме того, вам нужно будет предоставить собственное хранилище метаданных или использовать PropertiesPersistingMetadataStore для простого случая, чтобы сохранить локальные и удаленные изменения метаданных файла, чтобы он мог выдержать перезапуски сервера.
Подробнее здесь http://docs.spring.io/spring-integration/reference/html/ftp.html